In [1]:
!pip3 install pyspark --quiet


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
import requests
import os
os.environ['SPARK_HOME'] = '/Users/dakshbhatnagar/Spark'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')

In [4]:
spark = SparkSession.builder \
    .appName("PySpark-Get-Started") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

24/11/30 17:17:02 WARN Utils: Your hostname, Dakshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.107 instead (on interface en0)
24/11/30 17:17:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/30 17:17:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## How to read in a CSV File

In [5]:
local_file = os.path.join(os.getcwd(), "train.csv")
# Download the file to local storage
if os.path.exists(local_file):
    spark = SparkSession.builder.appName('density_forecasting').getOrCreate()
    # Load the local file into PySpark
    df = spark.read.csv(path=local_file, inferSchema=True, header=True)
else:
    url = 'https://raw.githubusercontent.com/dakshbhatnagar/Datasets/refs/heads/main/employee_data/HRdata.csv'
    local_file = os.path.join(os.getcwd(), "train.csv")
    response = requests.get(url)
    if response.status_code == 200:
        with open(local_file, 'wb') as f:
            f.write(response.content)
        print(f"File downloaded to {local_file}")
    else:
        print(f"Failed to download the file. HTTP Status Code: {response.status_code}")
    
    # Initialize Spark Session
    spark = SparkSession.builder.appName('density_forecasting').getOrCreate()
    
    # Load the local file into PySpark
    df = spark.read.csv(path=local_file, inferSchema=True, header=True)
    
df.show(5)

+-------------------+------+------+-----------------------+------------+--------------------+-------+----+-------------+
|               Name|Emp ID|Gender|Education Qualification|Date of Join|           Job Title| Salary| Age|Leave Balance|
+-------------------+------+------+-----------------------+------------+--------------------+-------+----+-------------+
|       Barr Faughny|AC0001|  Male|      Bachelor's Degree|   12-Jun-20|         Chocolatier|$51,300|26.0|           13|
|Dennison Crosswaite|AC0002|Female|                Diploma|   18-Feb-21| Production Operator|$38,300|32.7|           12|
|    Gunar Cockshoot|AC0003|  Male|    High School Diploma|   05-Sep-22| Packaging Associate|$31,400|34.3|           16|
|     Wilone O'Kielt|AC0004|Female|      Bachelor's Degree|   20-Nov-19|Marketing Specialist|$60,700|29.6|           21|
|       Gigi Bohling|AC0005|  Male|        Master's Degree|   08-Apr-18|  Research Scientist|$77,300|30.3|           23|
+-------------------+------+----

In [6]:
df.describe().show()

+-------+------------+------+------+-----------------------+------------+--------------------+-------+------------------+-----------------+
|summary|        Name|Emp ID|Gender|Education Qualification|Date of Join|           Job Title| Salary|               Age|    Leave Balance|
+-------+------------+------+------+-----------------------+------------+--------------------+-------+------------------+-----------------+
|  count|         161|   161|   161|                    161|         161|                 161|    161|               161|              161|
|   mean|        NULL|  NULL|  NULL|                   NULL|        NULL|                NULL|   NULL|35.204968944099384|16.41614906832298|
| stddev|        NULL|  NULL|  NULL|                   NULL|        NULL|                NULL|   NULL| 8.602309292002904|4.980661359462189|
|    min|A R Rahadude|AC0001|Female|      Bachelor's Degree|   01-Jan-23|         Chocolatier|$28,900|              23.5|                2|
|    max|  Zara Verm

## Data Pre-Processing

In [7]:
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Removing unncessary characters in the column values and changing the datatype
df = df.withColumn('Salary', f.translate(f.col('Salary'), "$,", "").cast(IntegerType()))

#lowering the case of column name and removing any space
df = df.toDF(*[col.lower().replace(" ", "_") for col in df.columns])

#Changing the date column from string to date
df = df.withColumn('date_of_join', f.to_date(f.col('date_of_join'), 'dd-MMM-yy'))

#checking the data types
df.dtypes

[('name', 'string'),
 ('emp_id', 'string'),
 ('gender', 'string'),
 ('education_qualification', 'string'),
 ('date_of_join', 'date'),
 ('job_title', 'string'),
 ('salary', 'int'),
 ('age', 'double'),
 ('leave_balance', 'int')]

In [8]:
#Displaying the transformed dataframe
df.show(5)

+-------------------+------+------+-----------------------+------------+--------------------+------+----+-------------+
|               name|emp_id|gender|education_qualification|date_of_join|           job_title|salary| age|leave_balance|
+-------------------+------+------+-----------------------+------------+--------------------+------+----+-------------+
|       Barr Faughny|AC0001|  Male|      Bachelor's Degree|  2020-06-12|         Chocolatier| 51300|26.0|           13|
|Dennison Crosswaite|AC0002|Female|                Diploma|  2021-02-18| Production Operator| 38300|32.7|           12|
|    Gunar Cockshoot|AC0003|  Male|    High School Diploma|  2022-09-05| Packaging Associate| 31400|34.3|           16|
|     Wilone O'Kielt|AC0004|Female|      Bachelor's Degree|  2019-11-20|Marketing Specialist| 60700|29.6|           21|
|       Gigi Bohling|AC0005|  Male|        Master's Degree|  2018-04-08|  Research Scientist| 77300|30.3|           23|
+-------------------+------+------+-----

## Answering Questions

### 1. How many people are in each job?

In [9]:
result = df.groupBy('job_title')\
            .agg(f.count('job_title').alias('count'))\
            .orderBy('count', ascending=False)
result.show()

+--------------------+-----+
|           job_title|count|
+--------------------+-----+
| Packaging Associate|   22|
| Production Operator|   20|
|Sales Representative|   18|
|     Quality Control|   17|
|         Chocolatier|   17|
|    Research Analyst|   16|
|     Product Manager|   16|
|  Research Scientist|   15|
|   Marketing Manager|   10|
|Marketing Specialist|   10|
+--------------------+-----+



### 2. Gender Break-down of the Staff

In [10]:
gender = df.groupBy('gender')\
            .agg(f.count('gender').alias('count'))\
            .orderBy('count', ascending=False)
gender.show()

+------+-----+
|gender|count|
+------+-----+
|Female|   88|
|  Male|   73|
+------+-----+



### 3. Age Spread of the Staff

In [11]:
# Perform the grouping and count
group = (f.floor(df['age'] / 5) * 5).alias('age_group')  # Alias it directly here
age_spread = df.groupBy(group).agg(f.count('*').alias('count')).orderBy(f.col('age_group'))

age_spread.show()

+---------+-----+
|age_group|count|
+---------+-----+
|       20|    1|
|       25|   24|
|       30|   85|
|       35|   32|
|       40|    2|
|       45|    3|
|       50|    4|
|       55|    1|
|       60|    5|
|       65|    4|
+---------+-----+



### 4. Which Jobs Pay More?

In [12]:
df.groupBy('job_title').agg(f.max('salary').alias('max_salary')).orderBy(f.col('max_salary').desc()).show()

+--------------------+----------+
|           job_title|max_salary|
+--------------------+----------+
|     Product Manager|     85000|
|  Research Scientist|     79300|
|   Marketing Manager|     74900|
|Marketing Specialist|     63600|
|    Research Analyst|     60000|
|         Chocolatier|     54900|
|Sales Representative|     49800|
|     Quality Control|     45000|
| Production Operator|     39800|
| Packaging Associate|     36200|
+--------------------+----------+



### 5. Top Earners in Each Job

In [13]:
## Partitioning

from pyspark.sql.window import Window
window = Window.partitionBy('job_title').orderBy(f.col('salary').desc())
max_sal_dept = df.withColumn('rank', f.row_number().over(window))\
  .filter(f.col('rank') == 1)\
  .select(['name', 'emp_id', 'job_title','salary'])\
  .orderBy(f.col('salary').desc())

max_sal_dept.show()

+----------------+------+--------------------+------+
|            name|emp_id|           job_title|salary|
+----------------+------+--------------------+------+
|     Aarav Verma|AC0121|     Product Manager| 85000|
|     Dell Molloy|AC0041|  Research Scientist| 79300|
|   Krish Trivedi|AC0123|   Marketing Manager| 74900|
| Merrilee Plenty|AC0056|Marketing Specialist| 63600|
| Niall Selesnick|AC0027|    Research Analyst| 60000|
|   Bernie Gorges|AC0090|         Chocolatier| 54900|
|  Curtice Advani|AC0006|Sales Representative| 49800|
|     Rhea Bhatia|AC0126|     Quality Control| 45000|
|William Reeveley|AC0064| Production Operator| 39800|
|     Shari McNee|AC0060| Packaging Associate| 36200|
+----------------+------+--------------------+------+



### 6. Qualification vs. Salary

In [14]:
results = df.groupBy('education_qualification').agg(f.round(f.mean('salary'),2).alias('avg_salary')).orderBy(f.col('avg_salary').desc())
results.show()

+-----------------------+----------+
|education_qualification|avg_salary|
+-----------------------+----------+
|        Master's Degree|  67037.93|
|      Bachelor's Degree|  53881.63|
|                Diploma|  51046.34|
|    High School Diploma|  48904.76|
+-----------------------+----------+



### 7. Staff Growth Trend Over Time

In [15]:
results = df.groupBy('date_of_join').agg(f.count('date_of_join').alias('emp'))\
            .orderBy(f.col('date_of_join').asc())

window = Window.orderBy(f.col('date_of_join').asc())

results = results.withColumn('running_total', f.sum('emp').over(window))

results.show()

+------------+---+-------------+
|date_of_join|emp|running_total|
+------------+---+-------------+
|  2017-04-11|  1|            1|
|  2017-04-23|  1|            2|
|  2017-05-11|  1|            3|
|  2017-06-10|  1|            4|
|  2017-06-14|  1|            5|
|  2017-07-15|  1|            6|
|  2017-07-23|  1|            7|
|  2017-08-03|  1|            8|
|  2017-10-31|  1|            9|
|  2017-11-26|  1|           10|
|  2018-04-06|  1|           11|
|  2018-04-08|  1|           12|
|  2018-05-03|  1|           13|
|  2018-06-19|  1|           14|
|  2018-07-27|  1|           15|
|  2018-08-14|  1|           16|
|  2018-09-04|  1|           17|
|  2018-09-08|  1|           18|
|  2018-09-09|  1|           19|
|  2018-09-12|  1|           20|
+------------+---+-------------+
only showing top 20 rows



### 8. Leave Balance Analysis

In [16]:
# Grouping and aggregating data
results = df.groupBy('name').agg(f.sum('leave_balance').alias('leave_bal')) \
            .orderBy(f.col('leave_bal').desc())

# Define a window for calculating the average
window = Window.orderBy(f.lit(1))  # Use a constant to calculate a global average

# Add the 'avzg_leaves' column with the average leave balance
results = results.withColumn('avg_leaves', f.round(f.mean('leave_bal').over(window),0))

# Show the results
results = results.withColumn('difference', f.col('leave_bal')-f.col('avg_leaves')).filter('difference > 0')

results.show()

+------------------+---------+----------+----------+
|              name|leave_bal|avg_leaves|difference|
+------------------+---------+----------+----------+
|        Kiaan Shah|       39|      18.0|      21.0|
|      Kiara Bhatia|       38|      18.0|      20.0|
|        Avani Iyer|       38|      18.0|      20.0|
|     Advait Kapoor|       37|      18.0|      19.0|
|     Mollie Hanway|       35|      18.0|      17.0|
|      Reyansh Rana|       34|      18.0|      16.0|
|        Zara Verma|       33|      18.0|      15.0|
|     Kabir Trivedi|       31|      18.0|      13.0|
|         Dev Joshi|       31|      18.0|      13.0|
|       Aanya Singh|       30|      18.0|      12.0|
|     Allene Gobbet|       30|      18.0|      12.0|
|        Pari Gupta|       29|      18.0|      11.0|
|       Gray Seamon|       27|      18.0|       9.0|
|      Jan Morforth|       25|      18.0|       7.0|
|Valentia Etteridge|       25|      18.0|       7.0|
|       Krish Rawat|       25|      18.0|     

## Stop the session once done

In [17]:
spark.stop()