# Final Project: Data Analysis using Spark
### Scenario
You have been tasked by the HR department of a company to create a data pipeline that can take in employee data in a CSV format. Your responsibilities include analyzing the data, applying any required transformations, and facilitating the extraction of valuable insights from the processed data.
HR Employee Data Pipeline
# Author: Gael Mayanza-Ouamba
# Description:
# ETL pipeline using PySpark to process employee CSV data,
# perform transformations, aggregations, and analytics.
Given your role as a data engineer, you've been requested to leverage Apache Spark components to accomplish the tasks.

- Task 1: Generate DataFrame from CSV data.
- Task 2: Define a schema for the data.
- Task 3: Display schema of DataFrame.
- Task 4: Create a temporary view.
- Task 5: Execute an SQL query.
- Task 6: Calculate Average Salary by Department.
- Task 7: Filter and Display IT Department Employees.
- Task 8: Add 10% Bonus to Salaries.
- Task 9: Find Maximum Salary by Age.
- Task 10: Self-Join on Employee Data.
- Task 11: Calculate Average Employee Age.
- Task 12: Calculate Total Salary by Department.
- Task 13: Sort Data by Age and Salary.
- Task 14: Count Employees in Each Department.
- Task 15: Filter Employees with the letter o in the Name.


### Prerequisites 

1. For this project, will be using Python and Spark (PySpark). 

In [1]:
# Installing required packages 

!pip install pyspark  findspark wget


Collecting pyspark
  Downloading pyspark-3.4.4.tar.gz (311.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.4/311.4 MB[0m [31m1.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.4-py2.py3-none-any.whl size=311905466 sha256=3905bd816a3353e9d73c28e6798cb059a849667776c016d2e6c83ec9fb3571a3
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/4e/66/db/939eb1c49afb8a7fd2c4e393ad34e12b77db67bb4cc974c00e
Successfully built pyspark
Installing colle

In [3]:
import findspark

findspark.init()

In [4]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the SparkContext.   

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [5]:
# Creating a SparkContext object  

sc = SparkContext.getOrCreate()

# Creating a SparkSession  

spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

26/01/23 18:12:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2. Download the CSV data.  


In [6]:
# Download the CSV data first into a local `employees.csv` file
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees.csv'

### Tasks


#### Task 1: Generate a Spark DataFrame from the CSV data

Read data from the provided CSV file, `employees.csv` and import it into a Spark DataFrame variable named `employees_df`.

 


In [15]:
# Read data from the "emp" CSV file and import it into a DataFrame variable named "employees_df"  
employees_df = spark.read.csv('employees.csv', header = True, inferSchema = True)
employees_df.show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top

#### Task 2: Define a schema for the data

Construct a schema for the input data and then utilize the defined schema to read the CSV file to create a DataFrame named `employees_df`.  


In [10]:
# Define a Schema for the input data and read the file using the user-defined Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# Define the schema structure
employee_schema = StructType([
    StructField('Emp_No', IntegerType(), True),
    StructField('Emp_Name', StringType(), True),
    StructField('Salary', DoubleType(), True),
    StructField('Age', IntegerType(), True),
    StructField('Department', StringType(), True)
])
# Read the CSV file using the defined schema
employees_df = spark.read.csv('employees.csv', header = True, schema = employee_schema)

#### Task 3: Display schema of DataFrame

Display the schema of the `employees_df` DataFrame, showing all columns and their respective data types.  


In [12]:
# Display all columns of the DataFrame, along with their respective data types
employees_df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



#### Task 4: Create a temporary view

Create a temporary view named `employees` for the `employees_df` DataFrame, enabling Spark SQL queries on the data. 


In [16]:
# Create a temporary view named "employees" for the DataFrame
employees_df.createOrReplaceTempView('employees')

#### Task 5: Execute an SQL query

Compose and execute an SQL query to fetch the records from the `employees` view where the age of employees exceeds 30. Then, display the result of the SQL query, showcasing the filtered records.


In [17]:
# SQL query to fetch solely the records from the View where the age exceeds 30
age_over_30 = spark.sql("SELECT * FROM employees WHERE  age > 30 ")
# Display the filtered records
age_over_30.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+---

#### Task 6: Calculate Average Salary by Department

Compose an SQL query to retrieve the average salary of employees grouped by department. Display the result.


In [20]:
# SQL query to calculate the average salary of employees grouped by department
avg_salary = spark.sql("SELECT Department, AVG(Salary) as avg_s_emp  FROM employees  GROUP BY Department ")
# Display the filtered records
avg_salary.show()



+----------+-----------------+
|Department|        avg_s_emp|
+----------+-----------------+
|     Sales|5492.923076923077|
|        HR|           5837.5|
|   Finance|           5730.8|
| Marketing|6633.333333333333|
|        IT|           7400.0|
+----------+-----------------+



                                                                                

#### Task 7: Filter and Display IT Department Employees

Apply a filter on the `employees_df` DataFrame to select records where the department is `'IT'`. Display the filtered DataFrame.


In [27]:
# Apply a filter to select records where the department is 'IT'
filtered_it_dep = employees_df.filter(employees_df.Department == 'IT')
# Display the filtered records
filtered_it_dep.show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



In [28]:
filtered_it_dep = employees_df.filter("Department = 'IT'")
filtered_it_dep.show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



#### Task 8: Add 10% Bonus to Salaries

Perform a transformation to add a new column named "SalaryAfterBonus" to the DataFrame. Calculate the new salary by adding a 10% bonus to each employee's salary.


In [37]:
from pyspark.sql.functions import col

# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
employees_df2  = employees_df.withColumn("SalaryAfterBonus", col('Salary') * 1.1)
# Display the transformed records
employees_df2.show()

+------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department|  SalaryAfterBonus|
+------+---------+------+---+----------+------------------+
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|
|   199|  Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400| 36| Marketing|            4840.0|
|   201|  Michael| 13000| 32|        IT|14300.000000000002|
|   202|      Pat|  6000| 39|        HR| 6600.000000000001|
|   203|    Susan|  6500| 36| Marketing| 7150.000000000001|
|   204|  Hermann| 10000| 29|   Finance|           11000.0|
|   205|  Shelley| 12008| 33|   Finance|13208.800000000001|
|   206|  William|  8300| 37|        IT|            9130.0|
|   100|   Steven| 24000| 39|        IT|26400.000000000004|
|   101|    Neena| 17000| 27|     Sales|           18700.0|
|   102|      Lex| 17000| 37| Marketing|           18700.0|
|   103|Alexander|  9000| 39| Marketing|            9900.0|
|   104|    Bruce|  6000| 38|        IT|

#### Task 9: Find Maximum Salary by Age

Group the data by age and calculate the maximum salary for each age group. Display the result.


In [38]:
from pyspark.sql.functions import max

# Group data by age and calculate the maximum salary for each age group
max_sal = employees_df.groupBy('age').agg(max('Salary').alias('maximum_salary'))

# Display the maximum salary for each age group
max_sal.show()

                                                                                

+---+--------------+
|age|maximum_salary|
+---+--------------+
| 31|          8200|
| 34|          7800|
| 28|         12008|
| 27|         17000|
| 26|          3600|
| 37|         17000|
| 35|          9000|
| 39|         24000|
| 38|          6000|
| 29|         10000|
| 32|         13000|
| 33|         12008|
| 30|          8000|
| 36|          7900|
+---+--------------+



                                                                                

#### Task 10: Self-Join on Employee Data

Join the "employees_df" DataFrame with itself based on the "Emp_No" column. Display the result.


In [40]:
# Join the DataFrame with itself based on the "Emp_No" column
emp_self_join = employees_df.alias('e1').join(employees_df.alias('e2'), on = 'Emp_No', how = 'inner')
emp_self_join.show()

+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|    Neena| 

#### Task 11: Calculate Average Employee Age

Calculate the average age of employees using the built-in aggregation function. Display the result.


In [44]:
# Calculate the average age of employees
from pyspark.sql.functions import avg 
avg_age_emp = employees_df.agg(avg('Age').alias('average_age'))
avg_age_emp.show()

+-----------+
|average_age|
+-----------+
|      33.56|
+-----------+



#### Task 12: Calculate Total Salary by Department

Calculate the total salary for each department using the built-in aggregation function. Display the result.


In [46]:
# Calculate the total salary for each department. Hint - User GroupBy and Aggregate functions
from pyspark.sql.functions import sum 
sal_by_dep = employees_df.groupBy('Department').agg(sum('Salary').alias('total_salary'))

sal_by_dep.show()

                                                                                

+----------+------------+
|Department|total_salary|
+----------+------------+
|     Sales|       71408|
|        HR|       46700|
|   Finance|       57308|
| Marketing|       59700|
|        IT|       74000|
+----------+------------+



                                                                                

#### Task 13: Sort Data by Age and Salary

Apply a transformation to sort the DataFrame by age in ascending order and then by salary in descending order. Display the sorted DataFrame.


In [47]:
# Sort the DataFrame by age in ascending order and then by salary in descending order
sorted_df = employees_df.orderBy(['Age','Salary'], ascending = [True, False])

sorted_df.show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   137|   Renske|  3600| 26| Marketing|
|   101|    Neena| 17000| 27|     Sales|
|   114|      Den| 11000| 27|   Finance|
|   108|    Nancy| 12008| 28|     Sales|
|   130|    Mozhe|  2800| 28| Marketing|
|   126|    Irene|  2700| 28|        HR|
|   204|  Hermann| 10000| 29|   Finance|
|   115|Alexander|  3100| 29|   Finance|
|   134|  Michael|  2900| 29|     Sales|
|   198|   Donald|  2600| 29|        IT|
|   140|   Joshua|  2500| 29|   Finance|
|   136|    Hazel|  2200| 29|        IT|
|   120|  Matthew|  8000| 30|        HR|
|   110|     John|  8200| 31| Marketing|
|   127|    James|  2400| 31|        HR|
|   201|  Michael| 13000| 32|        IT|
|   111|   Ismael|  7700| 32|        IT|
|   119|    Karen|  2500| 32|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   124|    Kevin|  5800| 33| Marketing|
+------+---------+------+---+----------+
only showing top

#### Task 14: Count Employees in Each Department

Calculate the number of employees in each department. Display the result.


In [48]:
from pyspark.sql.functions import count

# Calculate the number of employees in each department
emp_dep = employees_df.groupBy('Department').agg(count('Emp_No').alias('num_of_emp'))
emp_dep.show()

                                                                                

+----------+----------+
|Department|num_of_emp|
+----------+----------+
|     Sales|        13|
|        HR|         8|
|   Finance|        10|
| Marketing|         9|
|        IT|        10|
+----------+----------+



                                                                                

#### Task 15: Filter Employees with the letter o in the Name

Apply a filter to select records where the employee's name contains the letter `'o'`. Display the filtered DataFrame.


In [50]:
# Apply a filter to select records where the employee's name contains the letter 'o'
emp_with_o = employees_df.filter(col('Emp_Name').contains('o'))

emp_with_o.show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   198|     Donald|  2600| 29|        IT|
|   199|    Douglas|  2600| 34|     Sales|
|   110|       John|  8200| 31| Marketing|
|   112|Jose Manuel|  7800| 34|        HR|
|   130|      Mozhe|  2800| 28| Marketing|
|   133|      Jason|  3300| 38|     Sales|
|   139|       John|  2700| 36|     Sales|
|   140|     Joshua|  2500| 29|   Finance|
+------+-----------+------+---+----------+



<!--## Change Log -->
