# Data Analysis project using pyspark 

#### Tasks Needed to be done: 
1. Data Ingestion
2. Data Transformation 
3. Data Manipulation 
4. Data Visualisation 
5. Data Storage 


#### Bonus Task: 
1. Repeat the data manipulation operations using Spark SQL using SQL Statements (e.g.:
“SELECT Salary FROM employees ....“

2. Write down how would you execute the Data Storage steps storing the data to:
- Azure CosmosDB
- Parquet in Azure Storage




### Import Necessary Libraries and set up a spark session 

In [1]:
# Import necessary libraries
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum as spark_sum, desc


In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("EmployeeDataAnalysis").getOrCreate()


#  1. DATA Ingestion 
Load the provided CSV into a DataFrame in a Databricks notebook. The CSV
contains fields: `EmployeeID, FirstName, LastName, BirthDate, Department,
Salary`

In [3]:
# First We are going to perform Data Ingestion 

# Set up the path to employees
data_path = "employees.csv"

# Load CSV data into a DataFrame
employees_df = spark.read.csv(data_path, header=True ,inferSchema=True,  sep=';')

# View Employees Table 
employees_df.show()

+----------+---------+---------+----------+----------+------+
|EmployeeID|FirstName| LastName| BirthDate|Department|Salary|
+----------+---------+---------+----------+----------+------+
|         1|     John|      Doe|01/12/1980|     Sales| 70000|
|         2|     Jane|    Smith|14/07/1985| Marketing| 80000|
|         3|   Oliver|  Johnson|30/06/1990|        IT| 90000|
|         4|     Emma| Williams|21/01/1989|        HR| 75000|
|         5|     Liam|    Brown|05/03/1987|     Sales| 85000|
|         6|      Ava|   Garcia|22/04/1995|      NULL| 82000|
|         7|  William| Martinez|10/02/1981|        IT| 77000|
|         8|   Sophia| Robinson|12/09/1988| Marketing| 94000|
|         9|    James|    Clark|19/06/1982|      NULL| 81000|
|        10|Charlotte|Rodriguez|08/07/1991|        HR| 88000|
|        11| Benjamin|    Lewis|30/11/1983|     Sales| 95000|
|        12|      Mia|      Lee|17/02/1986| Marketing| 78000|
|        13|    Ethan|   Walker|25/08/1992|      NULL| 92000|
|       

In [4]:
# Initial Data Screening 
employees_df.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- BirthDate: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



# 2. Data Transformation 

Any employee record with an empty value in the 'Department' column should
be assigned to a department named 'Other'

In [5]:
employees_df = employees_df.na.fill("Other", ["Department"])

#Updated Employees Table 
employees_df.show()

+----------+---------+---------+----------+----------+------+
|EmployeeID|FirstName| LastName| BirthDate|Department|Salary|
+----------+---------+---------+----------+----------+------+
|         1|     John|      Doe|01/12/1980|     Sales| 70000|
|         2|     Jane|    Smith|14/07/1985| Marketing| 80000|
|         3|   Oliver|  Johnson|30/06/1990|        IT| 90000|
|         4|     Emma| Williams|21/01/1989|        HR| 75000|
|         5|     Liam|    Brown|05/03/1987|     Sales| 85000|
|         6|      Ava|   Garcia|22/04/1995|     Other| 82000|
|         7|  William| Martinez|10/02/1981|        IT| 77000|
|         8|   Sophia| Robinson|12/09/1988| Marketing| 94000|
|         9|    James|    Clark|19/06/1982|     Other| 81000|
|        10|Charlotte|Rodriguez|08/07/1991|        HR| 88000|
|        11| Benjamin|    Lewis|30/11/1983|     Sales| 95000|
|        12|      Mia|      Lee|17/02/1986| Marketing| 78000|
|        13|    Ethan|   Walker|25/08/1992|     Other| 92000|
|       

# Data Manipulation 

• Get the total salaries (sum) in the company.

• Create a new data frame representing the total salary (sum) per department.

• Generate a list of the top 5 highest-paid employees in the company.

In [6]:

#Get the total salaries (sum) in the company
total_salaries = employees_df.agg(spark_sum("Salary")).first()[0]

# Create a new data frame representing the total salary (sum) per department 
department_salaries = (
    employees_df.groupBy("Department")
    .agg(spark_sum("Salary").alias("Total_Salary"))
    .sort(col("Department"))
)

# Generate a list of the top 5 highest-paid employees in the company  
top_paid_employees = employees_df.sort(desc("Salary")).limit(5)


# 4. Data Vizualisation 

• Print the total salaries (sum) in the company.

• Print the dataframe with salaries (sum) per department.

• Print the datafrma with the top 5 highest-paid employees in the company.


In [7]:
#------- DATA Vizualisation ----------

print('Total Salaries Sum of employees:',total_salaries)

print('\nTotal Salaries Per department: \n')
department_salaries.show()

print('\nTop 5 Paid employees: \n')
top_paid_employees.show()

Total Salaries Sum of employees: 1679000

Total Salaries Per department: 

+----------+------------+
|Department|Total_Salary|
+----------+------------+
|        HR|      163000|
|        IT|      329000|
| Marketing|      428000|
|     Other|      329000|
|     Sales|      430000|
+----------+------------+


Top 5 Paid employees: 

+----------+---------+---------+----------+----------+------+
|EmployeeID|FirstName| LastName| BirthDate|Department|Salary|
+----------+---------+---------+----------+----------+------+
|        11| Benjamin|    Lewis|30/11/1983|     Sales| 95000|
|         8|   Sophia| Robinson|12/09/1988| Marketing| 94000|
|        16|   Amelia|    Young|13/02/1994| Marketing| 93000|
|        13|    Ethan|   Walker|25/08/1992|     Other| 92000|
|        17|    Jacob|Hernandez|27/04/1987|     Sales| 91000|
+----------+---------+---------+----------+----------+------+



# 5. Data Storage 

Store the original employees dataframe partitioned by `Department` using
Parquet format.

In [8]:
# Save the DataFrame to Parquet format, partitioned by Department
employees_df.write.mode("overwrite").parquet("department.parquet", partitionBy="Department")

In [9]:
# Read the Parquet data into a new DataFrame
parquet_df = spark.read.parquet("department.parquet")

# View the parquet format file
parquet_df.show()

+----------+---------+---------+----------+------+----------+
|EmployeeID|FirstName| LastName| BirthDate|Salary|Department|
+----------+---------+---------+----------+------+----------+
|         1|     John|      Doe|01/12/1980| 70000|     Sales|
|         5|     Liam|    Brown|05/03/1987| 85000|     Sales|
|        11| Benjamin|    Lewis|30/11/1983| 95000|     Sales|
|        14|   Harper|     Hall|31/05/1993| 89000|     Sales|
|        17|    Jacob|Hernandez|27/04/1987| 91000|     Sales|
|         2|     Jane|    Smith|14/07/1985| 80000| Marketing|
|         8|   Sophia| Robinson|12/09/1988| 94000| Marketing|
|        12|      Mia|      Lee|17/02/1986| 78000| Marketing|
|        16|   Amelia|    Young|13/02/1994| 93000| Marketing|
|        20|   Evelyn|    Lopez|05/03/1990| 83000| Marketing|
|         3|   Oliver|  Johnson|30/06/1990| 90000|        IT|
|         7|  William| Martinez|10/02/1981| 77000|        IT|
|        15|     Noah|    Allen|12/01/1984| 76000|        IT|
|       

# BONUS REQUIREMENTS 

Repeat the data manipulation operations using Spark SQL

In [10]:
'''
To use sql query syntax in Pyspark, first, we need to convert the spark data 
frame into a temporary view which will allow us to perform SQL actions. 
'''
employees_df.createTempView("employees")

In [11]:
# Get the total salaries (sum) in the company
total_salaries_sql = spark.sql(
    """
    SELECT SUM(Salary) AS total_salary
    FROM employees
    """
)

# Create a new data frame representing the total salary (sum) per department
department_salaries_sql = spark.sql(
    """
    SELECT Department, SUM(Salary) AS Total_Salary
    FROM employees
    GROUP BY Department
    ORDER BY Department
    """
)

# Generate a list of the top 5 highest-paid employees in the company
top_paid_employees_sql = spark.sql(
    """
    SELECT *
    FROM employees
    ORDER BY Salary DESC
    LIMIT 5;
    """
)

total_salaries_sql.show()

department_salaries_sql.show()

top_paid_employees_sql.show()


+------------+
|total_salary|
+------------+
|     1679000|
+------------+

+----------+------------+
|Department|Total_Salary|
+----------+------------+
|        HR|      163000|
|        IT|      329000|
| Marketing|      428000|
|     Other|      329000|
|     Sales|      430000|
+----------+------------+

+----------+---------+---------+----------+----------+------+
|EmployeeID|FirstName| LastName| BirthDate|Department|Salary|
+----------+---------+---------+----------+----------+------+
|        11| Benjamin|    Lewis|30/11/1983|     Sales| 95000|
|         8|   Sophia| Robinson|12/09/1988| Marketing| 94000|
|        16|   Amelia|    Young|13/02/1994| Marketing| 93000|
|        13|    Ethan|   Walker|25/08/1992|     Other| 92000|
|        17|    Jacob|Hernandez|27/04/1987|     Sales| 91000|
+----------+---------+---------+----------+----------+------+



#### End of Assignment 