Part 1: Synthetic Data Generation (Faker)

In [None]:
import random
import csv
from faker import Faker
from datetime import datetime, timedelta
import numpy as np
import builtins   


SEED = 42
fake = Faker()
Faker.seed(SEED)
random.seed(SEED)
np.random.seed(SEED)

NUM_RECORDS = 100_000
NUM_USERS = 75

departments = ["HR", "IT", "Finance", "Sales", "Operations"]

users = [
    {
        "user_id": i + 1,
        "user_name": fake.name(),
        "department": random.choice(departments)
    }
    for i in range(NUM_USERS)
]

start_date = datetime(2024, 1, 1)


with open("employee_logs.csv", "w", newline="") as f:
    writer = csv.writer(f)

    writer.writerow([
        "user_id",
        "user_name",
        "department",
        "activity_date",
        "expected_check_in",
        "actual_check_in",
        "actual_check_out",
        "idle_hours",
        "is_on_leave"
    ])

    for _ in range(NUM_RECORDS):

        user = random.choice(users)

        date = start_date + timedelta(days=random.randint(0, 180))

        expected_check_in = datetime(
            date.year, date.month, date.day, 9, 0, 0
        )

        is_on_leave = random.random() < 0.1

        if is_on_leave:
            actual_check_in = ""
            actual_check_out = ""
            idle_hours = 0.0
        else:
            late_minutes = random.randint(0, 90)
            actual_check_in = expected_check_in + timedelta(minutes=late_minutes)

            work_hours = random.randint(7, 9)
            actual_check_out = actual_check_in + timedelta(hours=work_hours)

            idle_hours = builtins.round(random.uniform(0, 2), 2)

        writer.writerow([
            user["user_id"],
            user["user_name"],
            user["department"],
            date.strftime("%Y-%m-%d"),
            expected_check_in.strftime("%Y-%m-%d %H:%M:%S"),
            actual_check_in.strftime("%Y-%m-%d %H:%M:%S") if actual_check_in else "",
            actual_check_out.strftime("%Y-%m-%d %H:%M:%S") if actual_check_out else "",
            idle_hours,
            is_on_leave
        ])

print("Data generation complete ✅")

Data generation complete ✅


Part 2: Data Ingestion with PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("Employee_ETL") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

print("Spark started successfully")
df = spark.read.csv(
    "employee_logs.csv",
    header=True,
    inferSchema=True
)

print("Initial count:", df.count())
df.printSchema()


df = df.withColumn(
    "activity_date",
    F.to_date("activity_date")
).withColumn(
    "expected_check_in",
    F.to_timestamp("expected_check_in")
).withColumn(
    "actual_check_in",
    F.to_timestamp("actual_check_in")
).withColumn(
    "actual_check_out",
    F.to_timestamp("actual_check_out")
)

df = df.dropDuplicates()

df = df.withColumn(
    "working_hours",
    F.when(
        F.col("is_on_leave") == False,
        (
            (F.unix_timestamp("actual_check_out") -
             F.unix_timestamp("actual_check_in")) / 3600
        ) - F.col("idle_hours")
    ).otherwise(0)
)

df = df.withColumn(
    "late_hours",
    F.when(
        F.col("actual_check_in") > F.col("expected_check_in"),
        (F.unix_timestamp("actual_check_in") -
         F.unix_timestamp("expected_check_in")) / 3600
    ).otherwise(0)
)


user_metrics = df.groupBy(
    "user_id", "user_name", "department"
).agg(
    F.avg("working_hours").alias("avg_working_hours"),
    F.avg("late_hours").alias("avg_late_hours"),
    F.sum("idle_hours").alias("total_idle_hours"),
    F.sum(F.when(F.col("is_on_leave") == True, 1).otherwise(0)).alias("total_leaves"),
    F.sum(F.when(F.col("late_hours") > 0, 1).otherwise(0)).alias("late_arrival_count")
)

user_metrics.show(5)



Spark started successfully


                                                                                

Initial count: 100000
root
 |-- user_id: integer (nullable = true)
 |-- user_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- activity_date: date (nullable = true)
 |-- expected_check_in: timestamp (nullable = true)
 |-- actual_check_in: timestamp (nullable = true)
 |-- actual_check_out: timestamp (nullable = true)
 |-- idle_hours: double (nullable = true)
 |-- is_on_leave: boolean (nullable = true)





+-------+--------------+----------+-----------------+------------------+------------------+------------+------------------+
|user_id|     user_name|department|avg_working_hours|    avg_late_hours|  total_idle_hours|total_leaves|late_arrival_count|
+-------+--------------+----------+-----------------+------------------+------------------+------------+------------------+
|     36|Jeffrey Chavez|        HR|6.484290271132376|0.7076555023923442|1173.6999999999996|          87|              1153|
|     63|   Connor West|     Sales|6.447621998450814|0.7061192873741283|1207.1200000000003|          99|              1179|
|     50|   Eric Carney|Operations|6.498134796238244|0.7099268547544413|           1204.38|          95|              1174|
|     70| William Baker|        IT|6.500922953451041|0.7244248261102193|1200.8499999999995|          88|              1144|
|     25|Matthew Foster|     Sales|6.581120491174214|0.6984139166027115|1201.8000000000002|          86|              1205|
+-------

                                                                                

In [5]:
jdbc_url = "jdbc:postgresql://localhost:5432/etl_db"

connection_properties = {
    "user": "postgres",
    "password": "Jay@101252",
    "driver": "org.postgresql.Driver"
}

user_metrics.write.jdbc(
    url=jdbc_url,
    table="employee_user_metrics",
    mode="overwrite",
    properties=connection_properties
)

print("Data written to PostgreSQL successfully")

Data written to PostgreSQL successfully
