In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

In [2]:
spark = SparkSession.builder.appName("Internal1").getOrCreate()

24/12/29 22:30:59 WARN Utils: Your hostname, arch resolves to a loopback address: 127.0.1.1; using 10.20.31.204 instead (on interface wlp5s0)
24/12/29 22:30:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/29 22:30:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/29 22:31:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
csv_path = "Datasets/emp_data.csv"

In [17]:
emp_data = spark.read.csv(csv_path, header = True, inferSchema = True)

In [18]:
emp_data.show()

+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+--------

In [19]:
for column in emp_data.columns:
    null_count = emp_data.filter(col(column).isNull()).count()
    print(f"Column {column} has {null_count} null values.")

Column EmpID has 0 null values.
Column FirstName has 0 null values.
Column LastName has 0 null values.
Column StartDate has 0 null values.
Column ExitDate has 263 null values.
Column Title has 0 null values.
Column Supervisor has 0 null values.
Column BusinessUnit has 0 null values.
Column EmployeeStatus has 0 null values.
Column EmployeeType has 0 null values.
Column PayZone has 0 null values.
Column EmployeeClassificationType has 0 null values.
Column TerminationType has 0 null values.
Column DepartmentType has 0 null values.
Column Division has 0 null values.
Column DOB has 0 null values.
Column State has 0 null values.
Column JobFunctionDescription has 0 null values.
Column GenderCode has 0 null values.
Column LocationCode has 0 null values.
Column RaceDesc has 0 null values.
Column MaritalDesc has 0 null values.
Column Performance Score has 0 null values.
Column Current Employee Rating has 0 null values.


In [20]:
emp_data = emp_data.fillna({"LastName":"Unknown"})

In [21]:
emp_data = emp_data.dropna(subset = ["EmpID","StartDate"])

In [22]:
emp_data = emp_data.filter(col("Current Employee Rating").between(1, 5))

In [23]:
emp_data.show()

+-----+---------+---------+---------+---------+--------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|               Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+--------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------

In [24]:
emp_df = emp_data.filter(col("LocationCode").rlike("^[0-9]+$"))

In [25]:
emp_df.show()

+-----+---------+---------+---------+---------+--------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|               Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+--------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------

In [26]:
emp_df.createOrReplaceTempView("employee")

In [27]:
emp_df.columns

['EmpID',
 'FirstName',
 'LastName',
 'StartDate',
 'ExitDate',
 'Title',
 'Supervisor',
 'BusinessUnit',
 'EmployeeStatus',
 'EmployeeType',
 'PayZone',
 'EmployeeClassificationType',
 'TerminationType',
 'DepartmentType',
 'Division',
 'DOB',
 'State',
 'JobFunctionDescription',
 'GenderCode',
 'LocationCode',
 'RaceDesc',
 'MaritalDesc',
 'Performance Score',
 'Current Employee Rating']

In [31]:
query = "select DepartmentType, count(DepartmentType) from employee group by DepartmentType"

In [32]:
new_emp = spark.sql(query)

In [33]:
new_emp.show()

+--------------------+---------------------+
|      DepartmentType|count(DepartmentType)|
+--------------------+---------------------+
|               Sales|                  118|
|   Production       |                  253|
|    Executive Office|                   18|
|Software Engineering|                    1|
|               IT/IS|                  158|
+--------------------+---------------------+



In [37]:
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

# Define the window specification
window_spec = Window.partitionBy("DepartmentType").orderBy(col("Performance Score").desc())

# Use rank() and filter the top performer
top_performers = emp_df.withColumn("Rank", rank().over(window_spec)).filter(col("Rank") == 1)
top_performers.show()


+-----+---------+---------+---------+---------+--------------------+------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+----+
|EmpID|FirstName| LastName|StartDate| ExitDate|               Title|        Supervisor|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|  DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|Rank|
+-----+---------+---------+---------+---------+--------------------+------------------+------------+--------------------+------------+-------+--------------------------+---------------+----------------+--------------------+----------+-----+----------------------+----------+------------+-

In [46]:
query2 = "SELECT DepartmentType, MAX(`Curr`) FROM employee GROUP BY DepartmentType"

In [48]:
emp_df.columns


['EmpID',
 'FirstName',
 'LastName',
 'StartDate',
 'ExitDate',
 'Title',
 'Supervisor',
 'BusinessUnit',
 'EmployeeStatus',
 'EmployeeType',
 'PayZone',
 'EmployeeClassificationType',
 'TerminationType',
 'DepartmentType',
 'Division',
 'DOB',
 'State',
 'JobFunctionDescription',
 'GenderCode',
 'LocationCode',
 'RaceDesc',
 'MaritalDesc',
 'Performance Score',
 'Current Employee Rating']

In [47]:
new_emp2 = spark.sql(query2)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `Rank` cannot be resolved. Did you mean one of the following? [`DOB`, `State`, `EmpID`, `PayZone`, `Title`].; line 1 pos 27;
'Aggregate [DepartmentType#1131], [DepartmentType#1131, unresolvedalias('MAX('Rank), None)]
+- SubqueryAlias employee
   +- View (`employee`, [EmpID#1118,FirstName#1119,LastName#2032,StartDate#1121,ExitDate#1122,Title#1123,Supervisor#1124,BusinessUnit#1125,EmployeeStatus#1126,EmployeeType#1127,PayZone#1128,EmployeeClassificationType#1129,TerminationType#1130,DepartmentType#1131,Division#1132,DOB#1133,State#1134,JobFunctionDescription#1135,GenderCode#1136,LocationCode#1137,RaceDesc#1138,MaritalDesc#1139,Performance Score#1140,Current Employee Rating#1141])
      +- Filter RLIKE(cast(LocationCode#1137 as string), ^[0-9]+$)
         +- Filter ((Current Employee Rating#1141 >= 1) AND (Current Employee Rating#1141 <= 5))
            +- Filter atleastnnonnulls(2, EmpID#1118, StartDate#1121)
               +- Project [EmpID#1118, FirstName#1119, coalesce(LastName#1120, cast(Unknown as string)) AS LastName#2032, StartDate#1121, ExitDate#1122, Title#1123, Supervisor#1124, BusinessUnit#1125, EmployeeStatus#1126, EmployeeType#1127, PayZone#1128, EmployeeClassificationType#1129, TerminationType#1130, DepartmentType#1131, Division#1132, DOB#1133, State#1134, JobFunctionDescription#1135, GenderCode#1136, LocationCode#1137, RaceDesc#1138, MaritalDesc#1139, Performance Score#1140, Current Employee Rating#1141]
                  +- Relation [EmpID#1118,FirstName#1119,LastName#1120,StartDate#1121,ExitDate#1122,Title#1123,Supervisor#1124,BusinessUnit#1125,EmployeeStatus#1126,EmployeeType#1127,PayZone#1128,EmployeeClassificationType#1129,TerminationType#1130,DepartmentType#1131,Division#1132,DOB#1133,State#1134,JobFunctionDescription#1135,GenderCode#1136,LocationCode#1137,RaceDesc#1138,MaritalDesc#1139,Performance Score#1140,Current Employee Rating#1141] csv


In [45]:
new_emp2.show()

+--------------------+----------------------+
|      DepartmentType|max(Performance Score)|
+--------------------+----------------------+
|    Executive Office|                   PIP|
|               IT/IS|                   PIP|
|   Production       |                   PIP|
|               Sales|     Needs Improvement|
|Software Engineering|           Fully Meets|
+--------------------+----------------------+

