In [92]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Employee").getOrCreate()

df = spark.read.csv("emp_data.csv",header=True,inferSchema=True)

df.show()

+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+--------

In [93]:
df.select([count(when(col(c).isNull() ,c)).alias(c) for c in df.columns]).show()

+-----+---------+--------+---------+--------+-----+----------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName|StartDate|ExitDate|Title|Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|Division|DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+---------+--------+-----+----------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|    0|        0|       0|        0|     263|    0|         0|           0|             0|           0|

In [94]:
df = df.fillna({"LastName":"Unknown"})

In [95]:
df = df.dropna(subset=["EmpID","StartDate"])

In [96]:
df = df.withColumn("Current Employee Rating", when(col("Current Employee Rating") < 1, 1).when(col("Current Employee Rating") > 5, 5)
.otherwise(col("Current Employee Rating")))

In [97]:
df = df.filter(length(col("LocationCode")) == 5)

In [98]:
df = df.dropDuplicates()

In [99]:
df.groupBy(trim("DepartmentType"),trim("Title")).count().show()

+--------------------+--------------------+-----+
|trim(DepartmentType)|         trim(Title)|count|
+--------------------+--------------------+-----+
|               IT/IS|Principal Data Ar...|    7|
|               IT/IS|        Data Analyst|    4|
|          Production|Production Techni...|   54|
|               IT/IS|Sr. Network Engineer|   25|
|    Executive Office|    Network Engineer|   15|
|               IT/IS|    Network Engineer|   17|
|          Production|Production Techni...|    1|
|          Production|Production Techni...|  157|
|               IT/IS|      Data Architect|    2|
|               IT/IS|  Area Sales Manager|    4|
|               Sales|  Area Sales Manager|  106|
|               IT/IS|Enterprise Architect|    4|
|               IT/IS|          IT Support|   36|
|               Sales| Area Sales Manager?|    1|
|               Sales| Area Sales Manager.|    1|
|               IT/IS|Software Engineer...|    1|
|               IT/IS|      Sr. Accountant|    4|


In [100]:
df = df.withColumn("Performance Score", when(col("Performance Score") == "Exceeds", 3)
                   .when(col("Performance Score") == "Fully Meets", 2)
                   .when(col("Performance Score") == "Needs Improvement", 1)
                   .otherwise(0))

In [101]:
high = df.groupBy("DepartmentType").agg(max("Performance Score").alias("Max Performance Score"))
high = high.withColumnRenamed("DepartmentType","DepartmentType1")

high = high.join(df,(df["DepartmentType"] == high["DepartmentType1"]) & (df["Performance Score"] == high["Max Performance Score"])).select(
    "EmpID","FirstName","LastName","DepartmentType","Performance Score").show()

+-----+---------+--------+-----------------+-----------------+
|EmpID|FirstName|LastName|   DepartmentType|Performance Score|
+-----+---------+--------+-----------------+-----------------+
| 3703|  Addisyn|Guerrero|Production       |                3|
| 3822|  Katrina| Lambert| Executive Office|                2|
| 3541|    Aaron|   Weber|            Sales|                3|
| 3450|   Reilly|   Moyer|            Sales|                3|
| 3708|     Case|  Gaines|Production       |                3|
| 3866|   Kaeden|  Vargas| Executive Office|                2|
| 3964|   Marlee| Stevens|            IT/IS|                3|
| 3659|   Kaylee| Baldwin|Production       |                3|
| 3540|     Maya|  Mccann|            Sales|                3|
| 3974|   Zander|  Franco|            IT/IS|                3|
| 3663|  Brandon|Anderson|Production       |                3|
| 3834|  Monique|  Pierce| Executive Office|                2|
| 3973| Brooklyn|  Tanner|            IT/IS|           

In [105]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("DepartmentType").orderBy(col("Current Employee Rating").desc())

top = df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1)

top.select("EmpID","FirstName","LastName","DepartmentType","Current Employee Rating").show()

+-----+---------+--------+-----------------+-----------------------+
|EmpID|FirstName|LastName|   DepartmentType|Current Employee Rating|
+-----+---------+--------+-----------------+-----------------------+
| 3822|  Katrina| Lambert| Executive Office|                      3|
| 3466|  Clayton|  Walker|            IT/IS|                      4|
| 3625|  Gabriel|  Dodson|Production       |                      5|
| 3480| Jerimiah|  Harmon|            Sales|                      5|
+-----+---------+--------+-----------------+-----------------------+



In [107]:
df.write.csv("emp_final",header=True)