In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [2]:
spark=SparkSession.builder.getOrCreate()

In [3]:
employee=spark.read.csv('D:\Python for Data Engineering Project\Jupyter Notebook\Employee Dataset\employee_data.csv',header=True,inferSchema=True)

In [4]:
employee.show(5)

+-----+---------+--------+---------+--------+--------------------+---------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName|StartDate|ExitDate|               Title|     Supervisor|             ADEmail|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+---------+--------+--------------------+---------------+--------------------+------------+--------------+------------+-------+--------------------------+---------------+----------------------+------------

In [5]:
employee.printSchema()

root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- ADEmail: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- TerminationDescription: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- M

In [6]:
employee=employee.withColumn('StartDate',to_date('StartDate','dd-MMM-yy'))\
    .withColumn('ExitDate',to_date('ExitDate','dd-MMM-yy'))\
    .withColumn('DOB',to_date('DOB','dd-MM-yyyy'))

In [7]:
employee.filter(contains(employee.Supervisor,lit('Jason'))).show()

+-----+---------+--------+----------+----------+--------------------+----------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------------+-----------------+----------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName| StartDate|  ExitDate|               Title|      Supervisor|             ADEmail|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|   DepartmentType|        Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+----------+----------+--------------------+----------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+-------------

In [8]:

employee.where(employee.ExitDate.isNotNull()).count()

1533

In [9]:
employee.where(employee.ExitDate.isNull()).count()

1467

In [10]:
employee.dropDuplicates(['Title']).select('Title').show(truncate=False)

+----------------------------+
|Title                       |
+----------------------------+
|Senior BI Developer         |
|IT Manager - Support        |
|Software Engineering Manager|
|Network Engineer            |
|Principal Data Architect    |
|Sr. DBA                     |
|BI Developer                |
|Production Technician II    |
|Director of Operations      |
|Production Manager          |
|Sr. Accountant              |
|President & CEO             |
|Sr. Network Engineer        |
|IT Manager - Infra          |
|Accountant I                |
|Database Administrator      |
|Data Analyst                |
|Shared Services Manager     |
|Director of Sales           |
|Sales Manager               |
+----------------------------+
only showing top 20 rows



In [11]:
employee.dropDuplicates(['BusinessUnit']).select('BusinessUnit').show(100)

+------------+
|BusinessUnit|
+------------+
|          PL|
|         MSC|
|         PYZ|
|         WBL|
|         SVG|
|         BPC|
|          EW|
|         NEL|
|         TNS|
|        CCDR|
+------------+



In [12]:
employee.dropDuplicates(['EmployeeStatus']).select('EmployeeStatus').show(truncate=False)

+----------------------+
|EmployeeStatus        |
+----------------------+
|Future Start          |
|Terminated for Cause  |
|Active                |
|Leave of Absence      |
|Voluntarily Terminated|
+----------------------+



In [13]:
employee.dropDuplicates(['EmployeeType']).select('EmployeeType').show(truncate=False)

+------------+
|EmployeeType|
+------------+
|Contract    |
|Full-Time   |
|Part-Time   |
+------------+



In [14]:
employee.dropDuplicates(['DepartmentType']).select('DepartmentType').show(truncate=False)

+--------------------+
|DepartmentType      |
+--------------------+
|Sales               |
|Production          |
|Admin Offices       |
|Executive Office    |
|Software Engineering|
|IT/IS               |
+--------------------+



In [15]:
employee=employee.withColumn('DepartmentType',trim(employee.DepartmentType))

In [16]:
employee.filter((employee.FirstName=='Jason') &(employee.LastName.like('%a%'))).show()

+-----+---------+--------+----------+----------+-------------+------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------------+--------------+-------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName| StartDate|  ExitDate|        Title|  Supervisor|             ADEmail|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|TerminationDescription|DepartmentType|     Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+----------+----------+-------------+------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------------+--------------+-------------+------

## Find highest rated employee in every department in each zone

In [17]:
df=employee.withColumn('rownum',row_number().over(Window.partitionBy('PayZone','DepartmentType').orderBy(desc('Current Employee Rating'))))
df.filter(df.rownum==1).select('EmpID','FirstName','LastName','PayZone','DepartmentType','Current Employee Rating').show()

+-----+----------+--------+-------+--------------------+-----------------------+
|EmpID| FirstName|LastName|PayZone|      DepartmentType|Current Employee Rating|
+-----+----------+--------+-------+--------------------+-----------------------+
| 2461|    Tucker|    Haas| Zone A|       Admin Offices|                      5|
| 3823|Alexzander|    Pope| Zone A|    Executive Office|                      3|
| 2213|   Branden|    Khan| Zone A|               IT/IS|                      5|
| 3572|   Timothy|Sullivan| Zone A|          Production|                      5|
| 3496|     Aspen| Bentley| Zone A|               Sales|                      5|
| 2550|   Beckett|    Beck| Zone A|Software Engineering|                      5|
| 2283|      Sean|   Quinn| Zone B|       Admin Offices|                      4|
| 3822|   Katrina| Lambert| Zone B|    Executive Office|                      3|
| 2208|      Lisa|   Galia| Zone B|               IT/IS|                      5|
| 3591|  Abdullah| Ellison| 

## Find how many active employees are in each business unit


In [18]:
employee.filter(employee.EmployeeStatus.like('Active%')).groupBy('BusinessUnit').agg(count("*").alias('count od employees')).show()

+------------+------------------+
|BusinessUnit|count od employees|
+------------+------------------+
|          PL|               246|
|         MSC|               239|
|         PYZ|               250|
|         WBL|               252|
|         SVG|               246|
|         BPC|               243|
|          EW|               245|
|         NEL|               246|
|         TNS|               242|
|        CCDR|               249|
+------------+------------------+



In [22]:
training=spark.read.csv('D:/Python for Data Engineering Project/Jupyter Notebook/Employee Dataset/training_and_development_data.csv',header=True,inferSchema=True)

In [25]:
training.show(5)

+-----------+-------------+---------------------+-------------+----------------+-----------------+-----------------+-----------------------+-------------+
|Employee ID|Training Date|Training Program Name|Training Type|Training Outcome|         Location|          Trainer|Training Duration(Days)|Training Cost|
+-----------+-------------+---------------------+-------------+----------------+-----------------+-----------------+-----------------------+-------------+
|       1001|   2022-09-21|     Customer Service|     Internal|          Failed|        Port Greg|   Amanda Daniels|                      4|       510.83|
|       1002|   2023-07-19| Leadership Develo...|     Internal|          Failed|      Brandonview|Brittany Chambers|                      2|       582.37|
|       1003|   2023-02-24|     Technical Skills|     Internal|      Incomplete|Port Briannahaven|    Mark Roberson|                      4|       777.06|
|       1004|   2023-01-12|     Customer Service|     Internal|       

In [24]:
training=training.withColumn('Training Date',to_date('Training Date','dd-MMM-yy'))

In [32]:
training.dropDuplicates(['Training Program Name']).select('Training Program Name').show(truncate=False)

+----------------------+
|Training Program Name |
+----------------------+
|Communication Skills  |
|Leadership Development|
|Technical Skills      |
|Customer Service      |
|Project Management    |
+----------------------+



In [34]:
training.dropDuplicates(['Location']).select('Location').count()

2738

## Find the most frequent location trainings took place

In [46]:
training.groupBy('Location').agg(count('*').alias('cnt')).filter(count('*')>1).orderBy(desc('cnt')).first()

Row(Location='Smithchester', cnt=6)

In [67]:
training.dropDuplicates(['Training Outcome']).select('Training Outcome').show()

+----------------+
|Training Outcome|
+----------------+
|          Passed|
|       Completed|
|          Failed|
|      Incomplete|
+----------------+



## find total cost for each training program

In [55]:
training.groupBy('Training Program Name').sum('Training Cost').orderBy(desc('sum(Training Cost)')).show()

+---------------------+------------------+
|Training Program Name|sum(Training Cost)|
+---------------------+------------------+
| Communication Skills|365023.24000000017|
|   Project Management|343313.16999999987|
| Leadership Develo...| 323902.0300000001|
|     Technical Skills|323072.61000000016|
|     Customer Service| 320575.0399999999|
+---------------------+------------------+



In [None]:
training.dropDuplicates(['Trainer']).select('Trainer').count()

## find name,dob,email, supervisor of employees who have successfully completed the each highest costing trainings programs.

In [68]:
temp=training.filter((training['Training Outcome']=='Completed') | (training['Training Outcome']=='Passed')).withColumn('rownum',row_number().over(Window.partitionBy('Training Program Name').orderBy(desc('Training Cost'))))
temp=temp.filter(temp.rownum==1)
temp.join(employee,on=temp['Employee ID']==employee['EmpID'])\
    .select('EmpID','FirstName','LastName','DOB','ADEmail','Training Program Name','Training Cost').show(truncate=False)

+-----+---------+--------+----------+----------------------------+----------------------+-------------+
|EmpID|FirstName|LastName|DOB       |ADEmail                     |Training Program Name |Training Cost|
+-----+---------+--------+----------+----------------------------+----------------------+-------------+
|3503 |Geovanni |Pugh    |1951-07-27|geovanni.pugh@bilearner.com |Technical Skills      |996.35       |
|1438 |Lucian   |Mora    |1977-12-16|lucian.mora@bilearner.com   |Customer Service      |996.18       |
|1604 |Ansley   |Duke    |1976-10-19|ansley.duke@bilearner.com   |Communication Skills  |999.97       |
|1665 |Tania    |Jennings|1963-07-10|tania.jennings@bilearner.com|Project Management    |999.83       |
|2011 |Naomi    |Hardin  |1994-08-06|naomi.hardin@bilearner.com  |Leadership Development|999.96       |
+-----+---------+--------+----------+----------------------------+----------------------+-------------+



## find after how many days each employee took their training after joining, what training they took and whats was the outcome

In [80]:
temp=employee.select('EmpID',concat('FirstName',lit(' '),'LastName').alias('Name'),'StartDate')\
    .join(training,on=training['Employee ID']==employee['EmpID'])
temp.select('EmpID','Name',
            date_diff(training['Training Date'],employee.StartDate).alias('Days after training occured')
            ,'Training Program Name','Training Outcome')\
    .orderBy('EmpID').show()

+-----+----------------+---------------------------+---------------------+----------------+
|EmpID|            Name|Days after training occured|Training Program Name|Training Outcome|
+-----+----------------+---------------------------+---------------------+----------------+
| 1001|   Susan Exantus|                       1119|     Customer Service|          Failed|
| 1002|   Sandra Martin|                        219| Leadership Develo...|          Failed|
| 1003|Keyla Del Bosque|                        -12|     Technical Skills|      Incomplete|
| 1004|    Andrew Szabo|                        958|     Customer Service|       Completed|
| 1005|  Luke Patronick|                        238| Communication Skills|          Passed|
| 1006|  Colby Andreola|                        631|   Project Management|          Failed|
| 1007|     Edward TRUE|                        931| Leadership Develo...|          Failed|
| 1008| Judith Carabbio|                       1435|     Technical Skills|      

## find how many days each employee worked/is working in the company

In [87]:
employee.select('EmpID',concat('FirstName',lit(' '),'LastName').alias('Name'),date_diff(ifnull('ExitDate',lit(now())),'StartDate')\
                        .alias('experience in company'),'EmployeeStatus')\
    .orderBy('EmpID').show()

+-----+----------------+---------------------+--------------------+
|EmpID|            Name|experience in company|      EmployeeStatus|
+-----+----------------+---------------------+--------------------+
| 1001|   Susan Exantus|                 1614|              Active|
| 1002|   Sandra Martin|                  167|              Active|
| 1003|Keyla Del Bosque|                    7|              Active|
| 1004|    Andrew Szabo|                  280|              Active|
| 1005|  Luke Patronick|                  500|              Active|
| 1006|  Colby Andreola|                   97|              Active|
| 1007|     Edward TRUE|                  336|Voluntarily Termi...|
| 1008| Judith Carabbio|                  250|              Active|
| 1009|     Adell Saada|                 1309|              Active|
| 1010|   Kamari Hunter|                  430|Voluntarily Termi...|
| 1011|    Sarah Malone|                  370|Voluntarily Termi...|
| 1012|Skyler Blackwell|                  209|Vo

## how many employees are in PIP in each divisions and how many training were completed/passed for those

In [98]:
temp=training.select('Employee ID','Training Program Name','Training Date','Training Outcome')\
    .join(employee.filter(employee['Performance Score']=='PIP'),on=employee.EmpID==training['Employee ID'])
temp.groupBy('Division').agg(count('EmpID').alias('Count of PIP employees'),
                             sum(when(((training['Training Outcome']=='Completed') |(training['Training Outcome']=='Passed')),1).otherwise(0)\
                                 ).alias('count of total trainings')).show()

+--------------------+----------------------+------------------------+
|            Division|Count of PIP employees|count of total trainings|
+--------------------+----------------------+------------------------+
|        Shop (Fleet)|                     3|                       0|
|    Field Operations|                    26|                      16|
|            Wireless|                     2|                       2|
|            Splicing|                     3|                       1|
|       General - Con|                    15|                       9|
|Project Managemen...|                     1|                       0|
|            Fielders|                     6|                       4|
|         Underground|                     1|                       0|
|           Executive|                     1|                       0|
|                Catv|                     6|                       4|
|Wireline Construc...|                     6|                       3|
|Finan