### Reading both csv file

In [0]:
employee_df=spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.load("/Volumes/workspace/default/volume_digmart/employees_big.csv")

attendence_df=spark.read.format("csv") \
.option("header",True) \
.option("inferSchema",True) \
.load("/Volumes/workspace/default/volume_digmart/attendance_big.csv")

### fill the missing salry with median 

In [0]:

from pyspark.sql.functions import *

#calculate median salary per department and rename department as dept to remove ambigity while joining
mediansal_df=employee_df.groupBy("department").agg(expr("percentile_approx(salary,0.5)").alias("median_salary")).select(col("department").alias("dept"),col("median_salary"))
mediansal_df.display(mediansal_df.limit(10))



dept,median_salary
HR,86105.0
Sales,90848.0
Support,88531.0
Finance,88214.0
Marketing,90494.0
Engineering,91807.0


In [0]:

#join the two dataframes and select here
employee_df=mediansal_df.join(employee_df,mediansal_df.dept==employee_df.department,"left")
display(employee_df.limit(10))

#now add fill missing salary with median salary
employee_df=employee_df.withColumn("salary",when(col("salary").isNull(),col("median_salary")).otherwise(col("salary")))
display(employee_df.limit(10))

# now delete the column not needed 
employee_df=employee_df.drop(col("median_salary"),col("dept"))
display(employee_df.limit(10))



dept,median_salary,emp_id,name,department,salary
HR,86105.0,E1001,Employee_1,HR,142727.0
HR,86105.0,E1002,Employee_2,HR,
HR,86105.0,E1009,Employee_9,HR,96557.0
HR,86105.0,E1013,Employee_13,HR,135512.0
HR,86105.0,E1016,Employee_16,HR,56854.0
HR,86105.0,E1017,Employee_17,HR,109575.0
HR,86105.0,E1018,Employee_18,HR,
HR,86105.0,E1023,Employee_23,HR,105450.0
HR,86105.0,E1035,Employee_35,HR,
HR,86105.0,E1044,Employee_44,HR,34499.0


dept,median_salary,emp_id,name,department,salary
HR,86105.0,E1001,Employee_1,HR,142727.0
HR,86105.0,E1002,Employee_2,HR,86105.0
HR,86105.0,E1009,Employee_9,HR,96557.0
HR,86105.0,E1013,Employee_13,HR,135512.0
HR,86105.0,E1016,Employee_16,HR,56854.0
HR,86105.0,E1017,Employee_17,HR,109575.0
HR,86105.0,E1018,Employee_18,HR,86105.0
HR,86105.0,E1023,Employee_23,HR,105450.0
HR,86105.0,E1035,Employee_35,HR,86105.0
HR,86105.0,E1044,Employee_44,HR,34499.0


emp_id,name,department,salary
E1001,Employee_1,HR,142727.0
E1002,Employee_2,HR,86105.0
E1009,Employee_9,HR,96557.0
E1013,Employee_13,HR,135512.0
E1016,Employee_16,HR,56854.0
E1017,Employee_17,HR,109575.0
E1018,Employee_18,HR,86105.0
E1023,Employee_23,HR,105450.0
E1035,Employee_35,HR,86105.0
E1044,Employee_44,HR,34499.0


### cleaning the attendence_df


In [0]:
#first we need to view the data 
attendence_df.limit(500).display()

attendence_df.select(col("status")).distinct().display()

#need to trim the status column
attendence_df=attendence_df.withColumn("status",trim(col("status")))
#now P to present A to absent and NA, null will be consideredas absent
attendence_df=attendence_df.withColumn("status", 
                                       when(col("status").isin("P","Present","present"),"present")
                                       .when(col("status").isin("A","NA","Absent","absent"),"absent")
                                       .otherwise("absent")
                                       )
attendence_df.limit(500).display()


emp_id,date,status
E1000,2024-01-01,Present
E1000,2024-01-02,absent
E1000,2024-01-03,Present
E1000,2024-01-04,Present
E1000,2024-01-05,Absent
E1000,2024-01-08,present
E1000,2024-01-09,Absent
E1000,2024-01-10,Present
E1000,2024-01-11,Present
E1000,2024-01-12,present


status
P
absent
A
""
Present
present
Absent


emp_id,date,status
E1000,2024-01-01,present
E1000,2024-01-02,absent
E1000,2024-01-03,present
E1000,2024-01-04,present
E1000,2024-01-05,absent
E1000,2024-01-08,present
E1000,2024-01-09,absent
E1000,2024-01-10,present
E1000,2024-01-11,present
E1000,2024-01-12,present


### calculate montly attendence per employee

In [0]:
# Step 1: Add year and month columns
attendence_df = attendence_df.withColumn("year", year(col("date"))) \
                             .withColumn("month", month(col("date")))

# Step 2: Group by emp_id, year, month and calculate counts
attendance_summary_df = attendence_df.groupBy("emp_id", "year", "month") \
    .agg(
        count("*").alias("total_days"),
        count(when(col("status") == "present", True)).alias("present_days")
    )
attendance_summary_df.limit(10).display()

# Step 3: Calculate attendance percentage
attendance_summary_df = attendance_summary_df.withColumn("attendance_pct", round((col("present_days") / col("total_days")) * 100, 2))
attendance_summary_df.limit(10).display()

emp_id,year,month,total_days,present_days
E1081,2024,9,21,14
E1107,2024,6,20,14
E1131,2024,3,21,14
E1239,2024,3,21,13
E1287,2024,7,23,18
E1487,2024,9,21,12
E1487,2024,11,21,12
E1559,2024,5,23,17
E1560,2024,12,22,17
E1567,2024,8,22,16


emp_id,year,month,total_days,present_days,attendance_pct
E1081,2024,9,21,14,66.67
E1107,2024,6,20,14,70.0
E1131,2024,3,21,14,66.67
E1239,2024,3,21,13,61.9
E1287,2024,7,23,18,78.26
E1487,2024,9,21,12,57.14
E1487,2024,11,21,12,57.14
E1559,2024,5,23,17,73.91
E1560,2024,12,22,17,77.27
E1567,2024,8,22,16,72.73


In [0]:
# First, join employee salary info with monthly attendance
emp_attendance_salary_df = attendance_summary_df.join(
    employee_df,
    on="emp_id",
    how="left"
)

# Then calculate effective salary
emp_attendance_salary_df = emp_attendance_salary_df.withColumn(
    "effective_salary",
    round((col("attendance_pct") / 100) * col("salary"), 2)
)

# Show result
emp_attendance_salary_df.select(
    "emp_id", "name", "department", "year", "month",
    "salary", "attendance_pct", "effective_salary"
).display()


emp_id,name,department,year,month,salary,attendance_pct,effective_salary
E1081,Employee_81,Sales,2024,9,89275.0,66.67,59519.64
E1107,Employee_107,Engineering,2024,6,52700.0,70.0,36890.0
E1131,Employee_131,Support,2024,3,69811.0,66.67,46542.99
E1239,Employee_239,Marketing,2024,3,46371.0,61.9,28703.65
E1287,Employee_287,Sales,2024,7,90848.0,78.26,71097.64
E1487,Employee_487,Engineering,2024,9,42763.0,57.14,24434.78
E1487,Employee_487,Engineering,2024,11,42763.0,57.14,24434.78
E1559,Employee_559,Sales,2024,5,90848.0,73.91,67145.76
E1560,Employee_560,Finance,2024,12,104965.0,77.27,81106.46
E1567,Employee_567,HR,2024,8,109063.0,72.73,79321.52


### aggregate by department - effective average salary

In [0]:
emp_avg_salary_df=emp_attendance_salary_df.groupBy("department","year","month") \
.agg(round(avg("effective_salary"), 2).alias("effective_average_salary"))

emp_attendance_salary_df.limit(10).display()


emp_id,year,month,total_days,present_days,attendance_pct,name,department,salary,effective_salary
E1081,2024,9,21,14,66.67,Employee_81,Sales,89275.0,59519.64
E1107,2024,6,20,14,70.0,Employee_107,Engineering,52700.0,36890.0
E1131,2024,3,21,14,66.67,Employee_131,Support,69811.0,46542.99
E1239,2024,3,21,13,61.9,Employee_239,Marketing,46371.0,28703.65
E1287,2024,7,23,18,78.26,Employee_287,Sales,90848.0,71097.64
E1487,2024,9,21,12,57.14,Employee_487,Engineering,42763.0,24434.78
E1487,2024,11,21,12,57.14,Employee_487,Engineering,42763.0,24434.78
E1559,2024,5,23,17,73.91,Employee_559,Sales,90848.0,67145.76
E1560,2024,12,22,17,77.27,Employee_560,Finance,104965.0,81106.46
E1567,2024,8,22,16,72.73,Employee_567,HR,109063.0,79321.52


In [0]:
emp_avg_salary_df.sort(
    col("department").asc(),
    col("year").asc(),
    col("month").asc()
).limit(10).display()
                       

department,year,month,effective_average_salary
Engineering,2024,1,63690.75
Engineering,2024,2,63801.04
Engineering,2024,3,64058.02
Engineering,2024,4,64054.62
Engineering,2024,5,64004.7
Engineering,2024,6,63892.07
Engineering,2024,7,63746.05
Engineering,2024,8,63692.32
Engineering,2024,9,63564.43
Engineering,2024,10,64112.77
