In [1]:
import pyspark 
print(pyspark.__version__)


3.5.1


In [2]:
import os
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"
os.environ["spark.python.worker.reuse"] = "true"

In [3]:
from pyspark.sql import SparkSession

In [4]:
# Step 1: Create SparkSession
spark = SparkSession.builder.appName("EmployeeDataProcessing").master("local[*]").getOrCreate()

In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x000002117A808C10>


In [6]:
# Step 2: Load employee.csv into DataFrame
employee_df = spark.read.option("header", True).option("inferSchema", True).csv("Dataset/employees.csv")
employee_df

DataFrame[employee_id: int, name: string, department: string, salary: int, joining_year: int, bonus_percent: int]

In [7]:
# Shows the employee DataFrame in tabular format
employee_df.show()

+-----------+-------+----------+------+------------+-------------+
|employee_id|   name|department|salary|joining_year|bonus_percent|
+-----------+-------+----------+------+------------+-------------+
|        101|  Alice|        HR| 50000|        2018|            5|
|        102|    Bob|   Finance| 60000|        2019|            7|
|        103|Charlie|        IT| 75000|        2017|           10|
|        104|  David|   Finance| 62000|        2020|            6|
|        105|    Eva|        IT| 80000|        2021|           12|
|        106|  Frank|        HR| 52000|        2018|            5|
|        107|  Grace|        IT| 90000|        2019|           15|
+-----------+-------+----------+------+------------+-------------+



In [8]:
# Step 3: Select only name and salary columns
selected_df = employee_df.select("name", "salary")
selected_df.show()

+-------+------+
|   name|salary|
+-------+------+
|  Alice| 50000|
|    Bob| 60000|
|Charlie| 75000|
|  David| 62000|
|    Eva| 80000|
|  Frank| 52000|
|  Grace| 90000|
+-------+------+



In [9]:
# Step 4: Filter employees with salary > 60000
from pyspark.sql.functions import col, expr, avg, count, max 
salAbove60k_df = employee_df.filter(col("salary") > 60000)
salAbove60k_df.show()

+-----------+-------+----------+------+------------+-------------+
|employee_id|   name|department|salary|joining_year|bonus_percent|
+-----------+-------+----------+------+------------+-------------+
|        103|Charlie|        IT| 75000|        2017|           10|
|        104|  David|   Finance| 62000|        2020|            6|
|        105|    Eva|        IT| 80000|        2021|           12|
|        107|  Grace|        IT| 90000|        2019|           15|
+-----------+-------+----------+------+------------+-------------+



In [10]:
# Step 5: Add bonus_amount = salary * bonus_percent / 100
employee_df = employee_df.withColumn("bonus_amount", expr("salary * (bonus_percent / 100)"))
employee_df.show()

+-----------+-------+----------+------+------------+-------------+------------+
|employee_id|   name|department|salary|joining_year|bonus_percent|bonus_amount|
+-----------+-------+----------+------+------------+-------------+------------+
|        101|  Alice|        HR| 50000|        2018|            5|      2500.0|
|        102|    Bob|   Finance| 60000|        2019|            7|      4200.0|
|        103|Charlie|        IT| 75000|        2017|           10|      7500.0|
|        104|  David|   Finance| 62000|        2020|            6|      3720.0|
|        105|    Eva|        IT| 80000|        2021|           12|      9600.0|
|        106|  Frank|        HR| 52000|        2018|            5|      2600.0|
|        107|  Grace|        IT| 90000|        2019|           15|     13500.0|
+-----------+-------+----------+------+------------+-------------+------------+



In [11]:
# Step 6: Rename joining_year to year_joined
employee_df = employee_df.withColumnRenamed("joining_year", "year_joined")
employee_df.show()

+-----------+-------+----------+------+-----------+-------------+------------+
|employee_id|   name|department|salary|year_joined|bonus_percent|bonus_amount|
+-----------+-------+----------+------+-----------+-------------+------------+
|        101|  Alice|        HR| 50000|       2018|            5|      2500.0|
|        102|    Bob|   Finance| 60000|       2019|            7|      4200.0|
|        103|Charlie|        IT| 75000|       2017|           10|      7500.0|
|        104|  David|   Finance| 62000|       2020|            6|      3720.0|
|        105|    Eva|        IT| 80000|       2021|           12|      9600.0|
|        106|  Frank|        HR| 52000|       2018|            5|      2600.0|
|        107|  Grace|        IT| 90000|       2019|           15|     13500.0|
+-----------+-------+----------+------+-----------+-------------+------------+



In [12]:
# Step 7: Drop bonus_percent column
employee_df = employee_df.drop("bonus_percent")
employee_df.show()

+-----------+-------+----------+------+-----------+------------+
|employee_id|   name|department|salary|year_joined|bonus_amount|
+-----------+-------+----------+------+-----------+------------+
|        101|  Alice|        HR| 50000|       2018|      2500.0|
|        102|    Bob|   Finance| 60000|       2019|      4200.0|
|        103|Charlie|        IT| 75000|       2017|      7500.0|
|        104|  David|   Finance| 62000|       2020|      3720.0|
|        105|    Eva|        IT| 80000|       2021|      9600.0|
|        106|  Frank|        HR| 52000|       2018|      2600.0|
|        107|  Grace|        IT| 90000|       2019|     13500.0|
+-----------+-------+----------+------+-----------+------------+



In [13]:
# Step 8: Find average salary per department
avg_salary_data = employee_df.groupBy("department").agg(avg("salary").alias("avg_salary"))
avg_salary_data.show()


+----------+-----------------+
|department|       avg_salary|
+----------+-----------------+
|        HR|          51000.0|
|   Finance|          61000.0|
|        IT|81666.66666666667|
+----------+-----------------+



In [14]:
# Step 9: Count number of employees per department
emp_cnt_df = employee_df.groupBy("department").agg(count("*").alias("employee_count"))
emp_cnt_df.show()


+----------+--------------+
|department|employee_count|
+----------+--------------+
|        HR|             2|
|   Finance|             2|
|        IT|             3|
+----------+--------------+



In [15]:
# Step 10: Find max salary in the dataset
max_salary = employee_df.agg({"salary": "max"}).collect()[0][0]
print(max_salary)


90000


In [16]:
# Step 11: Sort employees by salary descending
sorted_salary = employee_df.orderBy(col("salary").desc())
sorted_salary.show()

+-----------+-------+----------+------+-----------+------------+
|employee_id|   name|department|salary|year_joined|bonus_amount|
+-----------+-------+----------+------+-----------+------------+
|        107|  Grace|        IT| 90000|       2019|     13500.0|
|        105|    Eva|        IT| 80000|       2021|      9600.0|
|        103|Charlie|        IT| 75000|       2017|      7500.0|
|        104|  David|   Finance| 62000|       2020|      3720.0|
|        102|    Bob|   Finance| 60000|       2019|      4200.0|
|        106|  Frank|        HR| 52000|       2018|      2600.0|
|        101|  Alice|        HR| 50000|       2018|      2500.0|
+-----------+-------+----------+------+-----------+------------+



In [17]:
# Step 12: Sort employees by joining year ascending
sorted_by_joining_yr = employee_df.orderBy(col("year_joined").asc())
sorted_by_joining_yr.show()

+-----------+-------+----------+------+-----------+------------+
|employee_id|   name|department|salary|year_joined|bonus_amount|
+-----------+-------+----------+------+-----------+------------+
|        103|Charlie|        IT| 75000|       2017|      7500.0|
|        101|  Alice|        HR| 50000|       2018|      2500.0|
|        106|  Frank|        HR| 52000|       2018|      2600.0|
|        102|    Bob|   Finance| 60000|       2019|      4200.0|
|        107|  Grace|        IT| 90000|       2019|     13500.0|
|        104|  David|   Finance| 62000|       2020|      3720.0|
|        105|    Eva|        IT| 80000|       2021|      9600.0|
+-----------+-------+----------+------+-----------+------------+



In [18]:
# Step 13: Create department DataFrame with dept_name and location only
department_data = [("HR", "New York"), ("IT", "San Francisco"), ("Finance", "Chicago")]
dept_columns = ["department", "location"]
dept_df = spark.createDataFrame(department_data, dept_columns)
dept_df.show()

+----------+-------------+
|department|     location|
+----------+-------------+
|        HR|     New York|
|        IT|San Francisco|
|   Finance|      Chicago|
+----------+-------------+



In [19]:
# Step 14: Join with employee_df on 'department'
joined_df = employee_df.join(dept_df, on="department", how="inner")
joined_df.show()

+----------+-----------+-------+------+-----------+------------+-------------+
|department|employee_id|   name|salary|year_joined|bonus_amount|     location|
+----------+-----------+-------+------+-----------+------------+-------------+
|        HR|        106|  Frank| 52000|       2018|      2600.0|     New York|
|        HR|        101|  Alice| 50000|       2018|      2500.0|     New York|
|        IT|        107|  Grace| 90000|       2019|     13500.0|San Francisco|
|        IT|        105|    Eva| 80000|       2021|      9600.0|San Francisco|
|        IT|        103|Charlie| 75000|       2017|      7500.0|San Francisco|
|   Finance|        104|  David| 62000|       2020|      3720.0|      Chicago|
|   Finance|        102|    Bob| 60000|       2019|      4200.0|      Chicago|
+----------+-----------+-------+------+-----------+------------+-------------+



In [20]:
# Step 15: Total number of employees
total_employees = employee_df.count()
print("Total Employees:", total_employees)

Total Employees: 7


In [21]:
# Step 17: Collect all rows into Python objects
all_rows = employee_df.collect()
print("Collected Rows:", all_rows)

Collected Rows: [Row(employee_id=101, name='Alice', department='HR', salary=50000, year_joined=2018, bonus_amount=2500.0), Row(employee_id=102, name='Bob', department='Finance', salary=60000, year_joined=2019, bonus_amount=4200.0), Row(employee_id=103, name='Charlie', department='IT', salary=75000, year_joined=2017, bonus_amount=7500.0), Row(employee_id=104, name='David', department='Finance', salary=62000, year_joined=2020, bonus_amount=3720.0), Row(employee_id=105, name='Eva', department='IT', salary=80000, year_joined=2021, bonus_amount=9600.0), Row(employee_id=106, name='Frank', department='HR', salary=52000, year_joined=2018, bonus_amount=2600.0), Row(employee_id=107, name='Grace', department='IT', salary=90000, year_joined=2019, bonus_amount=13500.0)]


In [22]:
# Step 18: Take first 3 employees
first_three = employee_df.take(3)
print("First 3 Employees:", first_three)


First 3 Employees: [Row(employee_id=101, name='Alice', department='HR', salary=50000, year_joined=2018, bonus_amount=2500.0), Row(employee_id=102, name='Bob', department='Finance', salary=60000, year_joined=2019, bonus_amount=4200.0), Row(employee_id=103, name='Charlie', department='IT', salary=75000, year_joined=2017, bonus_amount=7500.0)]
