In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = SparkSession.builder.appName("Project").getOrCreate()

25/04/29 17:01:57 WARN Utils: Your hostname, TTNPL-kanishkasharma resolves to a loopback address: 127.0.1.1; using 10.1.245.40 instead (on interface wlp0s20f3)
25/04/29 17:01:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/29 17:01:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/29 17:01:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
df = spark.read.option("header", "true") \
                   .option("inferSchema", "true") \
                   .csv("employee_timeframe_data.csv")

                                                                                

In [5]:
df.head(5)

[Row(emp_id=7175690919, designation='A', start_date=1701734385, end_date=1707768029.0, salary=100870),
 Row(emp_id=7175690919, designation='A', start_date=1707768029, end_date=1709232768.0, salary=189648),
 Row(emp_id=7175690919, designation='B', start_date=1709232768, end_date=1709250902.0, salary=340522),
 Row(emp_id=7175690919, designation='E', start_date=1709250902, end_date=1709251128.0, salary=366856),
 Row(emp_id=7175690919, designation='E', start_date=1709251128, end_date=1709251143.0, salary=376246)]

In [6]:
df1 = df.withColumn("start_date", to_date(from_unixtime(col("start_date")))) \
           .withColumn("end_date", to_date(from_unixtime(col("end_date"))))

In [7]:
df1.show()

+----------+-----------+----------+----------+------+
|    emp_id|designation|start_date|  end_date|salary|
+----------+-----------+----------+----------+------+
|7175690919|          A|2023-12-05|2024-02-13|100870|
|7175690919|          A|2024-02-13|2024-03-01|189648|
|7175690919|          B|2024-03-01|2024-03-01|340522|
|7175690919|          E|2024-03-01|2024-03-01|366856|
|7175690919|          E|2024-03-01|2024-03-01|376246|
|7175690919|          E|2024-03-01|2024-03-01|390315|
|7175690919|          E|2024-03-01|2024-03-01|396086|
|7175690919|          F|2024-03-01|2024-03-01|399318|
|7175690919|          F|2024-03-01|2024-03-01|399589|
|7175690919|          F|2024-03-01|      NULL|399589|
|3345338467|          F|2022-11-10|2023-10-17|119555|
|3345338467|          F|2023-10-17|2024-01-21|248435|
|3345338467|          F|2024-01-21|2024-02-29|330464|
|3345338467|          F|2024-02-29|2024-02-29|389177|
|3345338467|          F|2024-02-29|2024-03-01|395404|
|3345338467|          F|2024

In [8]:
window_spec = Window.partitionBy("emp_id", "start_date","end_date").orderBy(col("salary").desc())

df2 = df1.withColumn("row_num", row_number().over(window_spec)) \
               .filter(col("row_num") == 1) \
               .drop("row_num")

In [9]:
df2.show()



+-------+-----------+----------+----------+------+
| emp_id|designation|start_date|  end_date|salary|
+-------+-----------+----------+----------+------+
| 256817|          E|2022-05-17|2023-04-11|201559|
| 256817|          F|2024-02-11|2024-02-14|389512|
| 296661|          E|2022-08-30|2023-11-13|286694|
| 346216|          E|2024-02-16|2024-02-18|372224|
| 561990|          F|2024-01-16|2024-02-21|260903|
| 912905|          F|2024-02-29|      NULL|377359|
|1426968|          C|2022-04-24|2023-09-01|187474|
|1765594|          A|2023-10-02|      NULL|365718|
|1875052|          A|2022-05-07|2024-02-11| 66229|
|1875052|          D|2024-02-12|2024-02-27|358843|
|1875052|          F|2024-03-01|2024-03-01|399627|
|1926107|          F|2024-02-16|2024-02-22|386240|
|2100181|          D|2023-06-01|2023-11-29|346849|
|2100181|          F|2024-02-28|      NULL|399853|
|2149103|          E|2022-12-20|2023-10-06|335226|
|2245319|          D|2023-10-09|2023-11-28|197371|
|2331342|          F|2024-02-28

                                                                                

In [10]:
window_emp = Window.partitionBy("emp_id").orderBy("start_date")

# Get the start_date of the next record
df3 = df2.withColumn("next_start", lead("start_date").over(window_emp))

# Replace null end_date with next_start - 1 day (for continuity)
df4 = df3.withColumn(
    "end_date",
    when(col("end_date").isNull(), date_sub(col("next_start"), 1)).otherwise(col("end_date"))
).drop("next_start")

In [11]:
df4.show()



+-------+-----------+----------+----------+------+
| emp_id|designation|start_date|  end_date|salary|
+-------+-----------+----------+----------+------+
| 346216|          A|2024-02-02|2024-02-16|345618|
| 346216|          E|2024-02-16|2024-02-18|372224|
| 346216|          E|2024-02-18|2024-02-29|373525|
| 346216|          F|2024-02-29|2024-03-01|397277|
| 346216|          F|2024-03-01|2024-02-29|399967|
| 346216|          F|2024-03-01|2024-03-01|399967|
| 756782|          B|2023-05-22|2023-11-17| 23751|
| 756782|          D|2023-11-17|2023-12-14|245353|
| 756782|          F|2023-12-14|2023-12-29|270160|
| 756782|          F|2023-12-29|      NULL|270160|
|1926107|          D|2022-11-15|2023-06-04|161430|
|1926107|          E|2023-06-04|2023-12-17|264576|
|1926107|          E|2023-12-17|2023-12-19|271112|
|1926107|          E|2023-12-19|2024-02-16|350968|
|1926107|          F|2024-02-16|2024-02-22|386240|
|1926107|          F|2024-02-16|2024-02-16|383211|
|1926107|          F|2024-02-22

                                                                                

In [12]:
df4 = df4.withColumn(
    "status",
    when(col("end_date").isNull(), "Active").otherwise("Inactive")
)

In [13]:
df4.show()

[Stage 18:>                                                         (0 + 1) / 1]

+-------+-----------+----------+----------+------+--------+
| emp_id|designation|start_date|  end_date|salary|  status|
+-------+-----------+----------+----------+------+--------+
| 346216|          A|2024-02-02|2024-02-16|345618|Inactive|
| 346216|          E|2024-02-16|2024-02-18|372224|Inactive|
| 346216|          E|2024-02-18|2024-02-29|373525|Inactive|
| 346216|          F|2024-02-29|2024-03-01|397277|Inactive|
| 346216|          F|2024-03-01|2024-02-29|399967|Inactive|
| 346216|          F|2024-03-01|2024-03-01|399967|Inactive|
| 756782|          B|2023-05-22|2023-11-17| 23751|Inactive|
| 756782|          D|2023-11-17|2023-12-14|245353|Inactive|
| 756782|          F|2023-12-14|2023-12-29|270160|Inactive|
| 756782|          F|2023-12-29|      NULL|270160|  Active|
|1926107|          D|2022-11-15|2023-06-04|161430|Inactive|
|1926107|          E|2023-06-04|2023-12-17|264576|Inactive|
|1926107|          E|2023-12-17|2023-12-19|271112|Inactive|
|1926107|          E|2023-12-19|2024-02-

                                                                                

Append-Only Yearly Tables:
employee_leave_quota_data.csv ‚Üí Yearly Quota Table

employee_leave_calendar_data.csv ‚Üí Yearly Holiday Table

In [14]:
df_calendar = spark.read.option("header", True).csv("employee_leave_calendar_data.csv") 

calendar_data = df_calendar.withColumn("date", col("date").cast(DateType()))

#holiday_df.write.format("delta").mode("append").partitionBy("date") \
#       .save("s3://your-bucket/output/leave_calendar/")


In [15]:
df_leave_quota = spark.read.option("header", "true") \
                   .option("inferSchema", "true") \
                   .csv("employee_leave_quota_data.csv")

quota_data = df_leave_quota.withColumn("leave_quota", df_leave_quota["leave_quota"].cast("int")) \
                   .withColumn("year", df_leave_quota["year"].cast("int"))

#quota_df = spark.read.option("header", "true").csv("s3://your-bucket/employee_leave_quota_data.csv")

# Ensure data types
#quota_df = quota_df.withColumn("leave_quota", quota_df["leave_quota"].cast("int")) \
                   #.withColumn("year", quota_df["year"].cast("int"))

# Write as append-only
#quota_df.write.format("delta").mode("append").partitionBy("year") \
        #.save("s3://your-bucket/output/leave_quota/")

                                                                                

In [16]:
leave_data = spark.read.option("header", "true") \
                   .option("inferSchema", "true") \
                   .csv("employee_leave_data.csv")

                                                                                

For each (emp_id, date):

Count how many times each status occurs: "ACTIVE" and "CANCELLED"

Apply logic:

If "CANCELLED" count > "ACTIVE" count ‚Üí status is "CANCELLED"

If "ACTIVE" count > "CANCELLED" count ‚Üí status is "ACTIVE"

If counts are equal ‚Üí status is "CANCELLED" (since cancel overrides)

In [17]:
# Step 1: Ensure date is in correct format
leave_data = leave_data.withColumn("date", to_date(col("date"), "yyyy-M-d"))

# Step 2: Map status to two count columns
leave_status_counts = leave_data.withColumn("is_active", when(col("status") == "ACTIVE", 1).otherwise(0)) \
                                .withColumn("is_cancelled", when(col("status") == "CANCELLED", 1).otherwise(0))

# Step 3: Group by emp_id + date and sum status flags
grouped = leave_status_counts.groupBy("emp_id", "date") \
    .agg(
        sum("is_active").alias("active_count"),
        sum("is_cancelled").alias("cancelled_count")
    )

# Step 4: Apply status logic based on counts
leave_data_updated = grouped.withColumn(
    "status",
    when(col("cancelled_count") >= col("active_count"), "CANCELLED").otherwise("ACTIVE")
).select("emp_id", "date", "status")



In [18]:
dupes = leave_data_updated.groupBy("emp_id", "date").count().filter("count > 1")
if dupes.count() > 0:
    print("‚ö†Ô∏è Duplicate ACTIVE leave records found:")
    dupes.show()

25/04/29 17:03:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [19]:
leave_data_updated.show()

25/04/29 17:03:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:20 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:20 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/29 17:03:20 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+----------+----------+------+
|    emp_id|      date|status|
+----------+----------+------+
|7175690919|2023-05-19|ACTIVE|
| 272449155|2023-11-05|ACTIVE|
|9154542239|2023-06-08|ACTIVE|
|9154542239|2023-07-10|ACTIVE|
|1260589765|2023-05-16|ACTIVE|
|6727837063|2023-02-12|ACTIVE|
|9623856554|2024-02-11|ACTIVE|
|3368105384|2023-05-10|ACTIVE|
|6619102777|2023-06-18|ACTIVE|
|6619102777|2023-07-28|ACTIVE|
|5568316074|2023-02-03|ACTIVE|
| 706688810|2024-07-20|ACTIVE|
|4530872653|2024-03-13|ACTIVE|
|5033623298|2023-09-21|ACTIVE|
|8022137890|2023-04-13|ACTIVE|
|5468748102|2024-09-11|ACTIVE|
|8328042768|2023-07-10|ACTIVE|
|8523798777|2024-04-23|ACTIVE|
|3286077338|2024-01-24|ACTIVE|
| 958054928|2024-10-15|ACTIVE|
+----------+----------+------+
only showing top 20 rows



                                                                                

In [20]:
#final_status.write.mode("overwrite").format("delta") \
    #.save("s3://your-bucket/processed/employee_leave_daily/")


Generate a daily table (at 7:00 UTC) showing currently active employees by designation and count.

Active Employees by Designation (Daily @ 7:00 UTC)
Input:

employee_timeframe_data (already processed and includes status)

Logic:

Filter records where status = 'Active'

Group by designation

Count employees


In [21]:
#employee_timeline = spark.read.format("delta").load("s3://your-bucket/employee_timeframe_delta/")

active_employees = df4.filter(col("status") == "Active") \
    .groupBy("designation") \
    .agg(count("emp_id").alias("active_emp_count"))\
    .sort("active_emp_count")

#active_employees.write.mode("overwrite").format("delta").save("s3://your-bucket/reports/active_employee_counts")

In [22]:
active_employees.show()

[Stage 38:>                                                         (0 + 8) / 8]

+-----------+----------------+
|designation|active_emp_count|
+-----------+----------------+
|          A|            2038|
|          B|            2471|
|          C|            3184|
|          D|            4771|
|          E|            9188|
|          F|           58006|
+-----------+----------------+



                                                                                

Potential Leave Abuse > 8% of Working Days (Daily @ 7:00 UTC)
Inputs:

employee_leave_data.csv

employee_leave_calendar_data.csv

Generate working days from tomorrow to end of year

Exclude:

Weekends

Public holidays

Cancelled leaves

Logic:

Calculate remaining working days this year.

Filter employee_leave_data:

status = ACTIVE

leave date in future

not weekend, not holiday

Count leaves per employee

If leave_count > 8% of working days, flag

In [23]:
from builtins import max

# ========== Step 0: Set target year ==========
target_year = 2023  # change to 2024, 2025 etc. as needed

today = datetime.utcnow().date()
start_date = max(today, datetime(target_year, 1, 1).date())  # don't go back in time
end_date = datetime(target_year, 12, 31).date()

current_date_str = start_date.strftime('%Y-%m-%d')
end_of_year_str = end_date.strftime('%Y-%m-%d')
run_date_str = today.strftime('%Y-%m-%d')

In [24]:
from pyspark.sql.functions import (
    col, lit, to_date, dayofweek, explode, sequence, countDistinct, expr, year
)
from pyspark.sql.types import DateType
from datetime import datetime

# ========== Step 1: Setup Dates ==========
# today = datetime.utcnow().date()
today = datetime(2024,4,25) # when i explicitly gave the date
current_date_str = today.strftime('%Y-%m-%d')
end_of_year_str = f"{today.year}-12-31"
run_date_str = today.strftime('%Y-%m-%d')



print("üìÖ Running for date range:", current_date_str, "to", end_of_year_str)

# ========== Step 2: Prepare Leave Data ==========
leave_data_final = leave_data_updated.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

active_leaves = leave_data_final.filter(
    (col("status") == "ACTIVE") &
    (col("date") >= lit(current_date_str))
).dropDuplicates(["emp_id", "date"])

print("üü° Active leave records:", active_leaves.count())

# ========== Step 3: Prepare Holiday Data ==========
holidays = calendar_data \
    .withColumn("date", to_date("date")) \
    .filter(col("date") >= lit(current_date_str)) \
    .select("date").distinct() \
    .withColumn("holiday", lit(1))

# ========== Step 4: Generate Working Days ==========
date_range = spark.createDataFrame([()]).select(
    explode(sequence(to_date(lit(current_date_str)), to_date(lit(end_of_year_str)))).alias("date")
)

working_days = date_range \
    .withColumn("day_of_week", dayofweek("date")) \
    .filter(~col("day_of_week").isin([1, 7])) \
    .drop("day_of_week") \
    .join(holidays, on="date", how="left_anti") \
    .persist()

total_working_days = working_days.count()
print("üìÜ Total working days left:", total_working_days)

if total_working_days == 0:
    print("‚ö†Ô∏è No valid working days found. Exiting early.")
else:
    # ========== Step 5: Join Leaves with Working Days ==========
    leaves_on_working_days = active_leaves.join(working_days, on="date", how="inner") \
        .select("emp_id", "date").distinct()

    print("üìå Leaves on working days:", leaves_on_working_days.count())

    # ========== Step 6: Count Leaves and Flag ==========
    emp_leave_counts = leaves_on_working_days.groupBy("emp_id") \
        .agg(countDistinct("date").alias("leaves_taken"))

    flagged = emp_leave_counts.withColumn(
        "leave_percent",
        (col("leaves_taken") / lit(total_working_days)) * 100
    ).withColumn(
        "flagged", expr("CASE WHEN leave_percent > 8 THEN 'Yes' ELSE 'No' END")
    ).withColumn("run_date", lit(run_date_str))

    # Show result
    flagged.show()

    # ========== Step 7: Save to S3 (optional) ==========
    # flagged.write.mode("overwrite") \
    #     .format("delta") \
    #     .partitionBy("run_date") \
    #     .save("s3://your-bucket/reports/daily_leave_abuse")

üìÖ Running for date range: 2024-04-25 to 2024-12-31


                                                                                

üü° Active leave records: 1404293


                                                                                

üìÜ Total working days left: 176


                                                                                

üìå Leaves on working days: 978178




+----------+------------+------------------+-------+----------+
|    emp_id|leaves_taken|     leave_percent|flagged|  run_date|
+----------+------------+------------------+-------+----------+
|2624934405|           9| 5.113636363636364|     No|2024-04-25|
|6865520457|           9| 5.113636363636364|     No|2024-04-25|
|4070419776|          12|6.8181818181818175|     No|2024-04-25|
|1991864484|          13| 7.386363636363637|     No|2024-04-25|
| 993324297|          14| 7.954545454545454|     No|2024-04-25|
|2857000361|          15| 8.522727272727272|    Yes|2024-04-25|
|5168383199|          11|              6.25|     No|2024-04-25|
|4197767131|          12|6.8181818181818175|     No|2024-04-25|
|7194063534|          10| 5.681818181818182|     No|2024-04-25|
|2343017910|           9| 5.113636363636364|     No|2024-04-25|
|1821916857|           7| 3.977272727272727|     No|2024-04-25|
|7359053082|          10| 5.681818181818182|     No|2024-04-25|
| 678878907|          11|              6

                                                                                

In [25]:
count_flagged = flagged.filter(col("flagged") == "Yes").count()
print(f"‚úÖ Employees flagged (leave > 8%): {count_flagged}")



[Stage 92:>                                                         (0 + 6) / 6]

‚úÖ Employees flagged (leave > 8%): 6515


                                                                                

In [26]:
employee_flagged = flagged.filter(col("emp_id") == '154225493')
employee_flagged.show()

[Stage 101:>                                                        (0 + 1) / 1]

+---------+------------+------------------+-------+----------+
|   emp_id|leaves_taken|     leave_percent|flagged|  run_date|
+---------+------------+------------------+-------+----------+
|154225493|           6|3.4090909090909087|     No|2024-04-25|
+---------+------------+------------------+-------+----------+



                                                                                

In [27]:
print("Working Days Count:", working_days.count())
print(holidays)

Working Days Count: 176
DataFrame[date: date, holiday: int]


Leave Quota Overuse >80% (Monthly on 1st @ 7:00 UTC)
Inputs:

employee_leave_data.csv

employee_leave_quota_data.csv

Logic:

For current year, count leaves where status = ACTIVE

Join with leave quota data

If (leave_taken / quota) > 0.8, flag

Group by manager (placeholder or you can skip the file-per-manager if no manager field is present)



In [28]:
# from pyspark.sql.functions import col, to_date, year, countDistinct, lit, expr
# from datetime import datetime
# import os

# # ========== Step 1: Setup ==========
# #today = datetime.utcnow().date()
# today = datetime(2024,1,1) # when i explicitly gave the date
# current_year = today.year
# run_date_str = today.strftime("%Y-%m-%d")
# # base_path = f"./monthly_leave_reports/run_date={run_date_str}"

# # Ensure output dir exists
# # os.makedirs(base_path, exist_ok=True)

# # ========== Step 2: Prepare Clean Data ==========
# leave_data_updated = leave_data_updated.withColumn("date", to_date("date"))

# # Filter ACTIVE leaves for current year (no deduping needed)
# valid_leaves = leave_data_updated.filter(
#     (col("status") == "ACTIVE") &
#     (year(col("date")) == current_year)
# )

# # Count unique leave days per employee
# leaves_taken = valid_leaves.groupBy("emp_id") \
#     .agg(countDistinct("date").alias("leaves_taken"))

# # Join with leave quota and calculate leave percentage
# leave_usage = leaves_taken.join(quota_data, on="emp_id", how="inner") \
#     .filter(col("year") == lit(current_year)) \
#     .withColumn("leave_percent", (col("leaves_taken") / col("leave_quota")) * 100) \
#     .withColumn("flagged", expr("CASE WHEN leave_percent > 80 THEN 'Yes' ELSE 'No' END")) \
#     .filter(col("flagged") == "Yes") \
#     .withColumn("run_date", lit(run_date_str))

# leave_usage.show()

# # ========== Step 3: Write Report Per Employee (no duplicates) ==========
# # flagged_emps = leave_usage.select("emp_id").rdd.map(lambda r: r["emp_id"]).collect()

# # for emp_id in flagged_emps:
# #     emp_dir = os.path.join(base_path, f"emp_id={emp_id}")
# #     os.makedirs(emp_dir, exist_ok=True)
    
# #     file_path = os.path.join(emp_dir, "report.txt")
    
# #     # Skip if already exists (idempotency)
# #     if os.path.exists(file_path):
# #         print(f"üü° Report already exists for emp_id={emp_id}. Skipping.")
# #         continue

# #     # Filter employee-specific data and write as text
# #     emp_df = leave_usage.filter(col("emp_id") == emp_id)
# #     emp_rows = emp_df.collect()

# #     with open(file_path, "w") as f:
# #         for row in emp_rows:
# #             f.write(str(row.asDict()) + "\n")

# #     print(f"‚úÖ Written report for emp_id={emp_id}")

# # print("üéâ All reports generated.")


In [29]:
from pyspark.sql.functions import col, to_date, year, countDistinct, lit, expr
from datetime import datetime
from dateutil.relativedelta import relativedelta
import os

# ========== Step 1: Setup Reporting Dates ==========
# Simulate: Run on the 1st of any month
#today = datetime.utcnow().date()
today = datetime(2024, 11, 1).date()  # ‚Üê change to the 1st of any month you're testing

# Calculate end of previous month as cutoff
report_cutoff = today - relativedelta(days=1)  # e.g., 2024-04-30
report_cutoff_str = report_cutoff.strftime("%Y-%m-%d")
run_date_str = today.strftime("%Y-%m-%d")
current_year = today.year

print(f"üìä Reporting for period: Jan 1 to {report_cutoff_str}")

# ========== Step 2: Prepare Clean Leave Data ==========
leave_data_updated = leave_data_updated.withColumn("date", to_date("date"))

# Filter ACTIVE leaves in the current year up to the cutoff
valid_leaves = leave_data_updated.filter(
    (col("status") == "ACTIVE") &
    (year(col("date")) == current_year) &
    (col("date") <= lit(report_cutoff_str))
)

# Generate date range Jan 1 ‚Üí report_cutoff
date_range = spark.createDataFrame([()]).select(
    explode(sequence(
        to_date(lit(f"{current_year}-01-01")),
        to_date(lit(report_cutoff_str))
    )).alias("date")
)

# Remove weekends
working_days = date_range.withColumn("dow", dayofweek("date")) \
    .filter(~col("dow").isin([1, 7])) \
    .drop("dow")

# Join holiday calendar and remove holiday dates
holidays = calendar_data.withColumn("date", to_date("date")).select("date").distinct()
working_days = working_days.join(holidays, on="date", how="left_anti")

# Join with valid leave data
leaves_on_working_days = valid_leaves.join(working_days, on="date", how="inner")

# Count real working leaves
leaves_taken = leaves_on_working_days.groupBy("emp_id") \
    .agg(countDistinct("date").alias("leaves_taken"))

# ========== Step 3: Join with Quota & Flag ==========
leave_usage = leaves_taken.join(quota_data, on="emp_id", how="inner") \
    .filter(col("year") == lit(current_year)) \
    .withColumn("leave_percent", (col("leaves_taken") / col("leave_quota")) * 100) \
    .withColumn("flagged", expr("CASE WHEN leave_percent > 80 THEN 'Yes' ELSE 'No' END")) \
    .filter(col("flagged") == "Yes") \
    .withColumn("run_date", lit(run_date_str))

leave_usage.show()

# ========== Step 4: Write Report Per Employee (no duplicates) ==========
# base_path = f"./monthly_leave_reports/run_date={run_date_str}"
# os.makedirs(base_path, exist_ok=True)

# flagged_emps = leave_usage.select("emp_id").rdd.map(lambda r: r["emp_id"]).collect()

# for emp_id in flagged_emps:
#     emp_dir = os.path.join(base_path, f"emp_id={emp_id}")
#     os.makedirs(emp_dir, exist_ok=True)

#     file_path = os.path.join(emp_dir, "report.txt")

#     # Skip if already exists (idempotency)
#     if os.path.exists(file_path):
#         print(f"üü° Report already exists for emp_id={emp_id}. Skipping.")
#         continue

#     # Write this employee‚Äôs report
#     emp_df = leave_usage.filter(col("emp_id") == emp_id)
#     rows = emp_df.collect()

#     with open(file_path, "w") as f:
#         for row in rows:
#             f.write(str(row.asDict()) + "\n")

#     print(f"‚úÖ Report written for emp_id={emp_id}")

# print("üéâ Monthly leave quota reports completed.")


üìä Reporting for period: Jan 1 to 2024-10-31


[Stage 128:>                                                        (0 + 1) / 1]

+----------+------------+-----------+----+-----------------+-------+----------+
|    emp_id|leaves_taken|leave_quota|year|    leave_percent|flagged|  run_date|
+----------+------------+-----------+----+-----------------+-------+----------+
|2451985258|          23|         24|2024|95.83333333333334|    Yes|2024-11-01|
|7099944869|          18|         22|2024|81.81818181818183|    Yes|2024-11-01|
|4399264584|          21|         24|2024|             87.5|    Yes|2024-11-01|
|2238865106|          18|         21|2024|85.71428571428571|    Yes|2024-11-01|
|3186300822|          17|         20|2024|             85.0|    Yes|2024-11-01|
| 715022541|          18|         20|2024|             90.0|    Yes|2024-11-01|
|3799324173|          17|         21|2024|80.95238095238095|    Yes|2024-11-01|
|9320403989|          20|         22|2024| 90.9090909090909|    Yes|2024-11-01|
|7430004905|          19|         22|2024|86.36363636363636|    Yes|2024-11-01|
|1486627765|          19|         22|202

                                                                                

In [30]:
employee_leaves = leaves_taken.filter(col("emp_id") == '1226091381')
employee_leaves.show()



+----------+------------+
|    emp_id|leaves_taken|
+----------+------------+
|1226091381|          14|
+----------+------------+



                                                                                

In [31]:
leave_usage.show()



+----------+------------+-----------+----+-----------------+-------+----------+
|    emp_id|leaves_taken|leave_quota|year|    leave_percent|flagged|  run_date|
+----------+------------+-----------+----+-----------------+-------+----------+
|2451985258|          23|         24|2024|95.83333333333334|    Yes|2024-11-01|
|7099944869|          18|         22|2024|81.81818181818183|    Yes|2024-11-01|
|4399264584|          21|         24|2024|             87.5|    Yes|2024-11-01|
|2238865106|          18|         21|2024|85.71428571428571|    Yes|2024-11-01|
|3186300822|          17|         20|2024|             85.0|    Yes|2024-11-01|
| 715022541|          18|         20|2024|             90.0|    Yes|2024-11-01|
|3799324173|          17|         21|2024|80.95238095238095|    Yes|2024-11-01|
|9320403989|          20|         22|2024| 90.9090909090909|    Yes|2024-11-01|
|7430004905|          19|         22|2024|86.36363636363636|    Yes|2024-11-01|
|1486627765|          19|         22|202

                                                                                

In [32]:
flagged_count = leave_usage.count()
print(f"üîç Total flagged employees: {flagged_count}")



üîç Total flagged employees: 2439


                                                                                

In [33]:
###### from pyspark.sql.functions import col, to_date, year, countDistinct, lit, expr
# from datetime import datetime
# import boto3

# # ========== Step 1: Setup ==========
# today = datetime.utcnow().date()
# current_year = today.year
# run_date_str = today.strftime("%Y-%m-%d")

# bucket = "your-bucket"  # ‚Üê replace with your S3 bucket name
# s3_prefix_base = f"monthly_leave_reports/run_date={run_date_str}"

# s3 = boto3.client("s3")

# def file_exists_in_s3(bucket, key_prefix):
#     """Check if a file already exists in S3 for the given prefix (per employee)."""
#     response = s3.list_objects_v2(Bucket=bucket, Prefix=key_prefix)
#     return "Contents" in response

# # ========== Step 2: Prepare Clean Data ==========
# # Ensure 'date' column is in proper DateType
# employee_leaves_df = employee_leaves_df.withColumn("date", to_date("date"))

# # Filter ACTIVE leaves within the current year (no deduping needed)
# valid_leaves = employee_leaves_df.filter(
#     (col("status") == "ACTIVE") &
#     (year(col("date")) == current_year)
# )

# # Count leaves taken per employee
# leaves_taken = valid_leaves.groupBy("emp_id") \
#     .agg(countDistinct("date").alias("leaves_taken"))

# # Join with leave quota and compute leave percent
# leave_usage = leaves_taken.join(employee_leaves_quota_df, on="emp_id", how="inner") \
#     .filter(col("year") == lit(current_year)) \
#     .withColumn("leave_percent", (col("leaves_taken") / col("leave_quota")) * 100) \
#     .withColumn("flagged", expr("CASE WHEN leave_percent > 80 THEN 'Yes' ELSE 'No' END")) \
#     .filter(col("flagged") == "Yes") \
#     .withColumn("run_date", lit(run_date_str))

# # ========== Step 3: Write One Report per Employee (idempotent) ==========
# flagged_employees = leave_usage.select("emp_id").rdd.map(lambda row: row["emp_id"]).collect()

# for emp_id in flagged_employees:
#     s3_key_prefix = f"{s3_prefix_base}/emp_id={emp_id}/"
    
#     if file_exists_in_s3(bucket, s3_key_prefix):
#         print(f"üü° Skipping emp_id={emp_id}, report already exists.")
#         continue

#     emp_df = leave_usage.filter(col("emp_id") == emp_id)

#     # Save as text file (1 file per emp_id)
#     emp_df.coalesce(1).write.mode("overwrite") \
#         .format("text") \
#         .option("header", True) \
#         .save(f"s3://{bucket}/{s3_key_prefix}")

#     print(f"‚úÖ Report written for emp_id={emp_id}")

# print("üéâ Monthly leave quota reports completed successfully.")