In [2]:
from pyspark.sql import SparkSession

We could not create master as yarn in colab as colab supports only local as master node.

In [3]:
spark = SparkSession.builder.appName("Pyspark practise").master('local[*]').getOrCreate()

In [4]:
spark

In [6]:
employee_df = spark.read.option("header",True).option("inferschema",True).csv("/content/employees_large.csv")

In [8]:
sales_df = spark.read.option("header",True).option("inferschema",True).csv("/content/sales_large.csv")

In [9]:
employee_df.show(5)
sales_df.show(5)

+----------+---------+---------+----------+------+----------+----------+
|EmployeeID|FirstName| LastName|Department|Salary|  JoinDate|Experience|
+----------+---------+---------+----------+------+----------+----------+
|         1|    David|   Miller|     Legal|112987|2024-01-07|        14|
|         2|  Barbara|    Moore|Operations|113370|2022-06-06|         8|
|         3|  Charles|  Johnson|     Sales|130685|2023-09-21|         3|
|         4|  William|   Taylor|   Support|104400|2021-01-11|         7|
|         5|     Lisa|Hernandez|Operations| 75150|2025-11-18|         8|
+----------+---------+---------+----------+------+----------+----------+
only showing top 5 rows

+-------+----------+-----------+--------+---------+----------+---------+
|SalesID|EmployeeID|ProductName|Quantity|UnitPrice| SalesDate|   Region|
+-------+----------+-----------+--------+---------+----------+---------+
|    101|       432|  USB Drive|      76|     2000|2025-08-20|Northeast|
|    102|        44|     L

In [10]:
# Find average salary by department
from pyspark.sql.functions import *

avg_df = employee_df.groupby("Department")\
.agg(avg("Salary").alias("Average Salary Per Department"))
avg_df.show()

+----------+-----------------------------+
|Department|Average Salary Per Department|
+----------+-----------------------------+
|     Sales|            98568.64788732394|
|        HR|                   100481.976|
|   Finance|            97036.19811320755|
|     Legal|             99038.4017094017|
| Marketing|            97892.48837209302|
|        IT|           100082.97391304348|
|   Support|           101551.18518518518|
|Operations|           102080.62595419848|
+----------+-----------------------------+



In [11]:
precise_avg_df = employee_df.groupby("Department")\
.agg(format_number(avg("Salary"),2).alias("Average Salary Per Department"))
precise_avg_df.show()

+----------+-----------------------------+
|Department|Average Salary Per Department|
+----------+-----------------------------+
|     Sales|                    98,568.65|
|        HR|                   100,481.98|
|   Finance|                    97,036.20|
|     Legal|                    99,038.40|
| Marketing|                    97,892.49|
|        IT|                   100,082.97|
|   Support|                   101,551.19|
|Operations|                   102,080.63|
+----------+-----------------------------+



In [12]:
# Count total employees in each department
count_per_dept = employee_df.groupby("Department").agg(count('EmployeeID').alias("Count per Dept"))
count_per_dept.show()

+----------+--------------+
|Department|Count per Dept|
+----------+--------------+
|     Sales|           142|
|        HR|           125|
|   Finance|           106|
|     Legal|           117|
| Marketing|           129|
|        IT|           115|
|   Support|           135|
|Operations|           131|
+----------+--------------+



Calculate total sales revenue (Quantity × UnitPrice) for each employee

Find sales by region

Join employees and sales data to get employee names with their sales

Filter employees with salary > 80,000

Find the top 5 products by total quantity sold

Calculate average sales amount per transaction

Identify employees with more than 5 years of experience

Find region-wise total revenue

In [13]:
employee_df.show(5)
sales_df.show(5)

+----------+---------+---------+----------+------+----------+----------+
|EmployeeID|FirstName| LastName|Department|Salary|  JoinDate|Experience|
+----------+---------+---------+----------+------+----------+----------+
|         1|    David|   Miller|     Legal|112987|2024-01-07|        14|
|         2|  Barbara|    Moore|Operations|113370|2022-06-06|         8|
|         3|  Charles|  Johnson|     Sales|130685|2023-09-21|         3|
|         4|  William|   Taylor|   Support|104400|2021-01-11|         7|
|         5|     Lisa|Hernandez|Operations| 75150|2025-11-18|         8|
+----------+---------+---------+----------+------+----------+----------+
only showing top 5 rows

+-------+----------+-----------+--------+---------+----------+---------+
|SalesID|EmployeeID|ProductName|Quantity|UnitPrice| SalesDate|   Region|
+-------+----------+-----------+--------+---------+----------+---------+
|    101|       432|  USB Drive|      76|     2000|2025-08-20|Northeast|
|    102|        44|     L

In [14]:
# Calculate total sales revenue (Quantity × UnitPrice) for each employee, adduing a new column to a data set instead
updated_sales_df = sales_df.withColumn("Total revenue", col("Quantity")*col("UnitPrice"))
updated_sales_df

DataFrame[SalesID: int, EmployeeID: int, ProductName: string, Quantity: int, UnitPrice: int, SalesDate: date, Region: string, Total revenue: int]

In [15]:
updated_sales_df.show()

+-------+----------+------------+--------+---------+----------+---------+-------------+
|SalesID|EmployeeID| ProductName|Quantity|UnitPrice| SalesDate|   Region|Total revenue|
+-------+----------+------------+--------+---------+----------+---------+-------------+
|    101|       432|   USB Drive|      76|     2000|2025-08-20|Northeast|       152000|
|    102|        44|      Laptop|      23|       25|2024-12-08|Northeast|          575|
|    103|       872|       Mouse|      64|     2000|2025-04-09|  Central|       128000|
|    104|       920|     Printer|      95|      100|2025-04-17|     West|         9500|
|    105|       317|   USB Drive|      57|      500|2025-02-16|Southwest|        28500|
|    106|       381|     Printer|      10|       75|2025-11-07|     West|          750|
|    107|       145|      Laptop|      35|       75|2025-02-08|Southwest|         2625|
|    108|       327|   USB Drive|       6|      100|2025-06-03|  Central|          600|
|    109|       363|  Headphones

In [16]:
 # Find sales by region
 updated_sales_df.groupby("Region").agg(sum("Total revenue").alias("Total sales in region")).show()

+---------+---------------------+
|   Region|Total sales in region|
+---------+---------------------+
|    South|            218171725|
|  Central|            222273975|
|     East|            217961000|
|Southwest|            219315125|
|     West|            219089375|
|    North|            215789275|
|Northeast|            220275175|
+---------+---------------------+



In [17]:
# Join employees and sales data to get employee names with their sales
joined_df = employee_df.join(updated_sales_df , employee_df.EmployeeID == updated_sales_df.EmployeeID)
joined_df.show()

+----------+---------+---------+----------+------+----------+----------+-------+----------+------------+--------+---------+----------+---------+-------------+
|EmployeeID|FirstName| LastName|Department|Salary|  JoinDate|Experience|SalesID|EmployeeID| ProductName|Quantity|UnitPrice| SalesDate|   Region|Total revenue|
+----------+---------+---------+----------+------+----------+----------+-------+----------+------------+--------+---------+----------+---------+-------------+
|       432|     Jane|  Jackson|     Sales|114047|2024-04-07|        11|    101|       432|   USB Drive|      76|     2000|2025-08-20|Northeast|       152000|
|        44|     Lisa|  Jackson|        IT| 59346|2021-01-12|        12|    102|        44|      Laptop|      23|       25|2024-12-08|Northeast|          575|
|       872|  Michael|  Jackson|        HR|124083|2023-05-08|         8|    103|       872|       Mouse|      64|     2000|2025-04-09|  Central|       128000|
|       920|     John|    Moore|        HR|101

In [19]:
joined_df.groupBy("FirstName","LastName").agg(sum("Total revenue").alias("Total sales")).show()

+---------+---------+-----------+
|FirstName| LastName|Total sales|
+---------+---------+-----------+
|    James|    Smith|    1875375|
|  Michael|   Martin|    3522975|
|    Emily| Martinez|    3337325|
|     Jane|    Davis|    1682225|
|  Charles|Hernandez|    1411075|
|    David|Rodriguez|    4364575|
|   Robert|  Johnson|   12625050|
|  Michael|    Moore|    7790200|
|     Lisa| Gonzalez|    1556100|
|     Mary| Gonzalez|    9917750|
|  Matthew| Gonzalez|    3200675|
| Patricia|Hernandez|    2362975|
|    Linda|    Lopez|   10062150|
|   Robert|   Taylor|    3147050|
|  Charles|    Davis|    2283350|
|    Linda| Gonzalez|    1162100|
|  Matthew|    Jones|    2763975|
|     Mary|   Thomas|    3798675|
|  Matthew|    Brown|    3042725|
|     Mary|Rodriguez|    1700750|
+---------+---------+-----------+
only showing top 20 rows



In [29]:
# Maximum salary per dept
joined_df.groupby("Department").agg(max("Salary").alias("Maximum salary")).show()

+----------+--------------+
|Department|Maximum salary|
+----------+--------------+
|     Sales|        149751|
|        HR|        148934|
|   Finance|        148668|
|     Legal|        149532|
| Marketing|        148826|
|        IT|        149931|
|   Support|        149932|
|Operations|        149977|
+----------+--------------+



In [30]:
# Filter employees with salary > 80,000
employee_df.filter(col("Salary") > 80000).show()

+----------+---------+---------+----------+------+----------+----------+
|EmployeeID|FirstName| LastName|Department|Salary|  JoinDate|Experience|
+----------+---------+---------+----------+------+----------+----------+
|         1|    David|   Miller|     Legal|112987|2024-01-07|        14|
|         2|  Barbara|    Moore|Operations|113370|2022-06-06|         8|
|         3|  Charles|  Johnson|     Sales|130685|2023-09-21|         3|
|         4|  William|   Taylor|   Support|104400|2021-01-11|         7|
|         6|    David|    Lopez|     Sales| 81852|2021-06-17|         9|
|         7|  Matthew|  Jackson|        HR| 92870|2024-11-27|         9|
|         8|  William|   Miller|     Sales|102756|2024-08-21|         4|
|         9|  William|   Martin| Marketing|106481|2025-03-20|        12|
|        11|     Lisa|    Smith| Marketing| 83691|2021-04-01|         6|
|        13|     Jane|Hernandez|     Legal|117065|2024-09-27|         3|
|        20|   Daniel|   Taylor|        IT|105426|2

# Task
Find employees who earn the maximum salary in their respective departments from the `employee_df` DataFrame. Display their first name, last name, department, and salary.

## Identify max salary per department

### Subtask:
Group the employee data by department and find the maximum salary in each department.


**Reasoning**:
The previous attempt to find the maximum salary per department failed because the 'Department' column was not included in the select statement before grouping. To fix this, I will directly group the `employee_df` by 'Department' and then aggregate the maximum 'Salary', aliasing the result as 'MaxSalaryPerDepartment'.



In [23]:
max_salary_per_dept_df = employee_df.groupby("Department").agg(max("Salary").alias("MaxSalaryPerDepartment"))
max_salary_per_dept_df.show()

+----------+----------------------+
|Department|MaxSalaryPerDepartment|
+----------+----------------------+
|     Sales|                149751|
|        HR|                148934|
|   Finance|                148668|
|     Legal|                149532|
| Marketing|                148826|
|        IT|                149931|
|   Support|                149932|
|Operations|                149977|
+----------+----------------------+



## Join with employee data

### Subtask:
Join the result back to the original employee DataFrame to match employees with their department's maximum salary.


**Reasoning**:
To match each employee with the maximum salary of their department, I will join the `employee_df` with the `max_salary_per_dept_df` on the 'Department' column.



In [24]:
employees_with_max_salary_df = employee_df.join(max_salary_per_dept_df, on="Department", how="inner")
employees_with_max_salary_df.show()

+----------+----------+---------+---------+------+----------+----------+----------------------+
|Department|EmployeeID|FirstName| LastName|Salary|  JoinDate|Experience|MaxSalaryPerDepartment|
+----------+----------+---------+---------+------+----------+----------+----------------------+
|     Legal|         1|    David|   Miller|112987|2024-01-07|        14|                149532|
|Operations|         2|  Barbara|    Moore|113370|2022-06-06|         8|                149977|
|     Sales|         3|  Charles|  Johnson|130685|2023-09-21|         3|                149751|
|   Support|         4|  William|   Taylor|104400|2021-01-11|         7|                149932|
|Operations|         5|     Lisa|Hernandez| 75150|2025-11-18|         8|                149977|
|     Sales|         6|    David|    Lopez| 81852|2021-06-17|         9|                149751|
|        HR|         7|  Matthew|  Jackson| 92870|2024-11-27|         9|                148934|
|     Sales|         8|  William|   Mill

**Reasoning**:
The previous step successfully joined the `employee_df` with `max_salary_per_dept_df`. Now, to find employees who earn the maximum salary in their respective departments, I need to filter `employees_with_max_salary_df` where 'Salary' equals 'MaxSalaryPerDepartment' and then select the requested columns.



In [25]:
employees_max_salary = employees_with_max_salary_df.filter(col("Salary") == col("MaxSalaryPerDepartment")) \
                                                 .select("FirstName", "LastName", "Department", "Salary")
employees_max_salary.show()

+---------+--------+----------+------+
|FirstName|LastName|Department|Salary|
+---------+--------+----------+------+
|  Michael|Martinez|     Sales|149751|
|   Daniel| Johnson|   Support|149932|
|    David|   Moore|        IT|149931|
|    Sarah|   Moore|   Finance|148668|
|   Robert|   Moore|     Legal|149532|
|     Mary|Gonzalez| Marketing|148826|
|  Richard|Williams|Operations|149977|
|     Mary|Gonzalez|        HR|148934|
+---------+--------+----------+------+



## Summary:

### Q&A
The employees who earn the maximum salary in their respective departments from the `employee_df` DataFrame were successfully identified and their first name, last name, department, and salary were displayed.

### Data Analysis Key Findings
*   The maximum salary for each department was successfully identified and stored in a temporary DataFrame.
*   Employees earning the maximum salary within their respective departments were identified by joining their individual salary data with the department's maximum salary information.
*   The final output included the first name, last name, department, and salary of these specific employees.

### Insights or Next Steps
*   This analysis helps identify the highest-paid individuals in each department, which can be useful for salary benchmarking or performance reviews.
*   Further analysis could explore the demographics or tenure of these maximum-earning employees to understand common characteristics among top earners in each department.


In [28]:
only_max_sal_df = max_salary_per_dept_df.join(employee_df,max_salary_per_dept_df.MaxSalaryPerDepartment==employee_df.Salary,\
                                              how="left")
only_max_sal_df.show()

+----------+----------------------+----------+---------+--------+----------+------+----------+----------+
|Department|MaxSalaryPerDepartment|EmployeeID|FirstName|LastName|Department|Salary|  JoinDate|Experience|
+----------+----------------------+----------+---------+--------+----------+------+----------+----------+
|     Sales|                149751|       226|  Michael|Martinez|     Sales|149751|2023-08-10|         1|
|        HR|                148934|       966|     Mary|Gonzalez|        HR|148934|2024-12-23|         8|
|   Finance|                148668|       619|    Sarah|   Moore|   Finance|148668|2024-11-11|         0|
|     Legal|                149532|       750|   Robert|   Moore|     Legal|149532|2022-09-27|        11|
| Marketing|                148826|       815|     Mary|Gonzalez| Marketing|148826|2024-08-30|        13|
|        IT|                149931|       536|    David|   Moore|        IT|149931|2021-12-16|         2|
|   Support|                149932|       386|

Find the top 5 products by total quantity sold

Calculate average sales amount per transaction

Identify employees with more than 5 years of experience

In [31]:
joined_df.show(5)

+----------+---------+--------+----------+------+----------+----------+-------+----------+-----------+--------+---------+----------+---------+-------------+
|EmployeeID|FirstName|LastName|Department|Salary|  JoinDate|Experience|SalesID|EmployeeID|ProductName|Quantity|UnitPrice| SalesDate|   Region|Total revenue|
+----------+---------+--------+----------+------+----------+----------+-------+----------+-----------+--------+---------+----------+---------+-------------+
|       432|     Jane| Jackson|     Sales|114047|2024-04-07|        11|    101|       432|  USB Drive|      76|     2000|2025-08-20|Northeast|       152000|
|        44|     Lisa| Jackson|        IT| 59346|2021-01-12|        12|    102|        44|     Laptop|      23|       25|2024-12-08|Northeast|          575|
|       872|  Michael| Jackson|        HR|124083|2023-05-08|         8|    103|       872|      Mouse|      64|     2000|2025-04-09|  Central|       128000|
|       920|     John|   Moore|        HR|101374|2021-06-1

In [34]:
#Find the top 5 products by total quantity sold
joined_df.groupby("ProductName").agg(sum("Quantity").alias("Total Quantity Sold"))\
.orderBy(col("Total Quantity Sold").desc()).show()

+------------+-------------------+
| ProductName|Total Quantity Sold|
+------------+-------------------+
|     Printer|             255736|
|      Router|             254434|
|External HDD|             252403|
|      Webcam|             252320|
|     Monitor|             250710|
|      Laptop|             249268|
|  Headphones|             248649|
|   USB Drive|             248453|
|       Mouse|             246602|
|    Keyboard|             243189|
+------------+-------------------+



In [42]:
#Identify employees with more than 5 years of experience
employee_df.filter(col("Experience") > 5).show()
#

+----------+---------+---------+----------+------+----------+----------+
|EmployeeID|FirstName| LastName|Department|Salary|  JoinDate|Experience|
+----------+---------+---------+----------+------+----------+----------+
|         1|    David|   Miller|     Legal|112987|2024-01-07|        14|
|         2|  Barbara|    Moore|Operations|113370|2022-06-06|         8|
|         4|  William|   Taylor|   Support|104400|2021-01-11|         7|
|         5|     Lisa|Hernandez|Operations| 75150|2025-11-18|         8|
|         6|    David|    Lopez|     Sales| 81852|2021-06-17|         9|
|         7|  Matthew|  Jackson|        HR| 92870|2024-11-27|         9|
|         9|  William|   Martin| Marketing|106481|2025-03-20|        12|
|        10|    Emily|   Garcia| Marketing| 51283|2021-07-17|        13|
|        11|     Lisa|    Smith| Marketing| 83691|2021-04-01|         6|
|        12|   Robert|   Taylor|     Legal| 51125|2023-04-22|         9|
|        18|     Mary|   Martin|Operations| 58004|2

In [43]:
spark.stop()