In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when


In [16]:
spark = SparkSession.builder.appName("emp_data").getOrCreate()

path = "/content/drive/MyDrive/Colab Notebooks/newDataSets/emp_data.csv"

df = spark.read.csv(path, header = True, inferSchema = True)

df.show(5)

+-----+---------+--------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------

In [17]:
from pyspark.sql import functions as F

columns_with_missing = [

    col for col in df.columns if df.filter(df[col].isNull()).count() > 0
]

print(columns_with_missing)

['ExitDate']


In [18]:
# df = df.withColumn(F.col("LastName"),
#                    F.when(F.col("LastName").isNull(),"Unknown").otherwise(F.col("LastName")))

# df.show(5)

df = df.fillna({"LastName":"Unknown"})

df = df.dropna(subset=["EmpID","StartDate"])

df.show(5)

+-----+---------+--------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------

In [19]:
df = df.drop("New_Rating")
df = df.withColumn(
    "Current Employee Rating",
    F.when(F.col("Current Employee Rating") < 1, 1)
    .when(F.col("Current Employee Rating") > 5, 5)
    .otherwise(F.col("Current Employee Rating"))
)
df.show()

+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+--------

In [20]:
df = df.dropDuplicates()

df.show()

+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|      LastName|StartDate| ExitDate|               Title|          Supervisor|BusinessUnit|  EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------

In [25]:
df_1 = df.groupBy("DepartmentType","Title").count()

df_1.show()

+-----------------+--------------------+-----+
|   DepartmentType|               Title|count|
+-----------------+--------------------+-----+
|            IT/IS|       Data Analyst |    1|
|Production       |Production Techni...|    1|
|            IT/IS|Principal Data Ar...|    8|
|            IT/IS|        Data Analyst|   10|
|Production       |    Network Engineer|    1|
|            IT/IS|Sr. Network Engineer|   30|
| Executive Office|    Network Engineer|   18|
|            IT/IS|    Network Engineer|   22|
|            IT/IS|      Data Architect|    2|
|            IT/IS|  Area Sales Manager|    7|
|            Sales|  Area Sales Manager|  119|
|            IT/IS|Enterprise Architect|    5|
|            IT/IS|          IT Support|   44|
|            Sales| Area Sales Manager?|    1|
|            Sales| Area Sales Manager.|    1|
|            IT/IS|Software Engineer...|    1|
|            IT/IS|      Sr. Accountant|    4|
|            IT/IS|             Sr. DBA|    6|
|            

In [29]:
performance_score_map = {
    "Needs Improvement" : 1,
    "Fully Meets" : 2,
    "Exceeds" : 3
}

import pyspark.sql.functions as F

df_perf = df.withColumn(
    "performance_numeric",
    F.when(F.col("Performance Score") == "Needs Improvement", 1)
    .when(F.col("Performance Score") == "Fully Meets", 2)
    .when(F.col("Performance Score") == "Exceeds", 3)
    .otherwise(0)
)

df_max_perf = df_perf.groupBy("DepartmentType").agg(F.max("performance_numeric").alias("max_performance_score"))

resultdf = df_perf.join(df_max_perf,
                        (df_perf["DepartmentType"] == df_max_perf["DepartmentType"])&
                        (df_perf["performance_numeric"] == df_max_perf["max_performance_score"]), "inner"
                        )
resultdf.show()

+-----+---------+--------+---------+---------+--------------------+----------------+------------+--------------------+------------+-------+--------------------------+---------------+--------------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-------------------+--------------------+---------------------+
|EmpID|FirstName|LastName|StartDate| ExitDate|               Title|      Supervisor|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|      DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|performance_numeric|      DepartmentType|max_performance_score|
+-----+---------+--------+---------+---------+--------------------+----------------+------------+--------------------+------------+-------+--------------------------+----