<a href="https://colab.research.google.com/github/Yashwanth-1406/PYSPARK-EMPLOYEE-ETL/blob/main/ETL_PROCESS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

ETL Process –

ETL = Extract → Transform → Load

This is the backbone of all data pipelines in service-based companies in India or anywhere. Basically how raw data becomes usable, clean data for analytics, dashboards, and reports.

---

1. Extract (E)

* What it is: Pulling data from all the places it exists.
* Sources:

  * Databases (MySQL, Oracle, Postgres)
  * Files (CSV, Excel, JSON, logs)
  * APIs or streaming sources (Kafka, app logs)
* What can go wrong:

  * Different formats
  * Missing data
  * Large volume
  * Slow refresh rates
* Goal: Get all data together into one place to work with it.


---




2. Transform (T)

* What it is: Cleaning, standardizing, enriching, and structuring the data.
* Key steps:

  * Cleaning → Fix missing values, remove duplicates, correct wrong formats.
  * Standardizing → Make everything uniform (dates, phone numbers, names).
  * Joining → Combine multiple tables (e.g., Employee + Department + Salary).
  * Deriving new columns→ Add useful info like Tax = 10% of salary, or categorize employees.
  * Aggregations → Calculate sums, averages, min, max, counts, etc.
  * Complex handling → Flatten JSON, explode arrays, collect lists, window functions, UDFs.
* Goal: Turn messy raw data into **structured, trustworthy, ready-to-use data.

---

## 3. **Load (L)**

* **What it is**: Saving the cleaned and structured data in a place where others can use it.
* **Destinations**:

  * Databases (SQL, NoSQL)
  * Data warehouses (Snowflake, BigQuery, Redshift)
  * Cloud storage (AWS S3, Azure Blob, GCP Storage)
  * Parquet/CSV/JSON files for downstream analytics
* **Goal**: Make sure data is **accessible, fast to read, and reliable**.


---

## 4. **Advanced Concepts Often Used in ETL**

Even in big service-based companies, ETL is more than just clean and save:

* **Aggregations & Metrics** → Department-level salaries, counts, averages.
* **Window Functions** → Rank employees by salary, running totals, moving averages.
* **UDFs (User Defined Functions)** → Custom business logic, like category labels.
* **Partitioning** → Split data into chunks for faster reading.
* **Streaming ETL** → Continuous updates like live attendance or transactions.
* **Monitoring & Logging** → Track failures, row counts, performance, alert on errors.

---

## **End-to-End Picture**

1. **Extract** → Pull raw data from everywhere
2. **Transform** → Clean, fix, standardize, enrich, aggregate
3. **Load** → Save cleaned data for use by analysts, dashboards, or ML




In [22]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EmployeeETL") \
    .getOrCreate()


In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
spark = SparkSession.builder.appName("ETL_from_scratch").getOrCreate()
schema_emp = StructType([
    StructField("EmpID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Salary", DoubleType(), True),
    StructField("DeptID", StringType(), True),
    StructField("Phone", StringType(), True),
    StructField("JoinDate", StringType(), True)
])
schema_dept = StructType([
    StructField("DeptID", StringType(), True),
    StructField("DeptName", StringType(), True),
    StructField("Location", StringType(), True)
])


In [24]:
data_emp = [
    ("E1", "Alice", 30, 60000.0, "D1", "9876543210", "2020-01-10"),
    ("E2", "Bob", 40, 75000.0, "D2", "9876500000", "2019-03-15"),
    ("E3", "Cathy", None, None, "D3", "123-456-7890", "2021-05-20"),
    ("E4", "David", 28, 90000.0, "D2", None, "2022-07-25"),
    ("E5", "Eva", 35, 80000.0, "D5", "9999999999", "invalid_date"),
]

data_dept = [
    ("D1", "HR", "Delhi"),
    ("D2", "IT", "Mumbai"),
    ("D3", "Finance", "Bangalore"),
    ("D4", "Marketing", "Chennai")
]


df_emp = spark.createDataFrame(data_emp, schema_emp)
df_dept = spark.createDataFrame(data_dept, schema_dept)

print("=== Raw Employees ===")
df_emp.show(truncate=False)

print("=== Raw Departments ===")
df_dept.show(truncate=False)


=== Raw Employees ===
+-----+-----+----+-------+------+------------+------------+
|EmpID|Name |Age |Salary |DeptID|Phone       |JoinDate    |
+-----+-----+----+-------+------+------------+------------+
|E1   |Alice|30  |60000.0|D1    |9876543210  |2020-01-10  |
|E2   |Bob  |40  |75000.0|D2    |9876500000  |2019-03-15  |
|E3   |Cathy|NULL|NULL   |D3    |123-456-7890|2021-05-20  |
|E4   |David|28  |90000.0|D2    |NULL        |2022-07-25  |
|E5   |Eva  |35  |80000.0|D5    |9999999999  |invalid_date|
+-----+-----+----+-------+------+------------+------------+

=== Raw Departments ===
+------+---------+---------+
|DeptID|DeptName |Location |
+------+---------+---------+
|D1    |HR       |Delhi    |
|D2    |IT       |Mumbai   |
|D3    |Finance  |Bangalore|
|D4    |Marketing|Chennai  |
+------+---------+---------+



In [25]:
from pyspark.sql.functions import col, when, regexp_replace, to_date, lit

avg_salary = df_emp.selectExpr("avg(Salary)").first()[0]

df_emp_clean = df_emp.withColumn(
    "Salary",
    when(col("Salary").isNull(), avg_salary).otherwise(col("Salary"))
)
df_emp_clean = df_emp_clean.withColumn(
    "Phone",
    when(col("Phone").isNull(), lit("Unknown")).otherwise(col("Phone"))
)
df_emp_clean = df_emp_clean.withColumn("Phone", regexp_replace("Phone", "-", ""))
df_emp_clean = df_emp_clean.withColumn("JoinDate", to_date("JoinDate", "yyyy-MM-dd"))


In [26]:
df_joined = df_emp_clean.join(df_dept, on="DeptID", how="left")

print("=== After Join ===")
df_joined.show(truncate=False)


=== After Join ===
+------+-----+-----+----+-------+----------+----------+--------+---------+
|DeptID|EmpID|Name |Age |Salary |Phone     |JoinDate  |DeptName|Location |
+------+-----+-----+----+-------+----------+----------+--------+---------+
|D1    |E1   |Alice|30  |60000.0|9876543210|2020-01-10|HR      |Delhi    |
|D2    |E2   |Bob  |40  |75000.0|9876500000|2019-03-15|IT      |Mumbai   |
|D5    |E5   |Eva  |35  |80000.0|9999999999|NULL      |NULL    |NULL     |
|D3    |E3   |Cathy|NULL|76250.0|1234567890|2021-05-20|Finance |Bangalore|
|D2    |E4   |David|28  |90000.0|Unknown   |2022-07-25|IT      |Mumbai   |
+------+-----+-----+----+-------+----------+----------+--------+---------+



In [27]:
from pyspark.sql.functions import avg, max, min, count
df_agg = df_joined.groupBy("DeptName").agg(
    avg("Salary").alias("AvgSalary"),
    max("Salary").alias("MaxSalary"),
    min("Salary").alias("MinSalary"),
    count("EmpID").alias("EmployeeCount")
)

print("=== Department Salary Stats ===")
df_agg.show(truncate=False)


=== Department Salary Stats ===
+--------+---------+---------+---------+-------------+
|DeptName|AvgSalary|MaxSalary|MinSalary|EmployeeCount|
+--------+---------+---------+---------+-------------+
|HR      |60000.0  |60000.0  |60000.0  |1            |
|NULL    |80000.0  |80000.0  |80000.0  |1            |
|Finance |76250.0  |76250.0  |76250.0  |1            |
|IT      |82500.0  |90000.0  |75000.0  |2            |
+--------+---------+---------+---------+-------------+



In [28]:
from pyspark.sql.functions import collect_list, explode

df_collected = df_joined.groupBy("DeptName").agg(
    collect_list("Name").alias("Employees")
)

df_skills = spark.createDataFrame([
    ("E1", ["Python", "Java"]),
    ("E2", ["C++", "Go"]),
    ("E3", ["Excel"]),
], ["EmpID", "Skills"])

df_skills_exploded = df_skills.withColumn("Skill", explode("Skills"))


In [29]:

df_joined.write.mode("overwrite").parquet("CleanedEmployees")
df_agg.write.mode("overwrite").parquet("DeptAggregates")


In [30]:
df_loaded = spark.read.parquet("CleanedEmployees")
df_loaded.show()


+------+-----+-----+----+-------+----------+----------+--------+---------+
|DeptID|EmpID| Name| Age| Salary|     Phone|  JoinDate|DeptName| Location|
+------+-----+-----+----+-------+----------+----------+--------+---------+
|    D1|   E1|Alice|  30|60000.0|9876543210|2020-01-10|      HR|    Delhi|
|    D2|   E2|  Bob|  40|75000.0|9876500000|2019-03-15|      IT|   Mumbai|
|    D5|   E5|  Eva|  35|80000.0|9999999999|      NULL|    NULL|     NULL|
|    D3|   E3|Cathy|NULL|76250.0|1234567890|2021-05-20| Finance|Bangalore|
|    D2|   E4|David|  28|90000.0|   Unknown|2022-07-25|      IT|   Mumbai|
+------+-----+-----+----+-------+----------+----------+--------+---------+

