In [0]:
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import col, when, round, rand, floor

In [0]:
df_customers = spark.read.format("delta").load("/Volumes/workspace/default/etl_path/bronze_layer/customers/")
df_employees = spark.read.format("delta").load("/Volumes/workspace/default/etl_path/bronze_layer/employees/")
df_sales = spark.read.format("delta").load("/Volumes/workspace/default/etl_path/bronze_layer/sales")

In [0]:
df_customers.printSchema()
df_sales.printSchema()
df_employees.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: string (nullable = true)
 |-- created_at: timestamp (nullable = true)

root
 |-- sale_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- sale_date: timestamp (nullable = true)
 |-- payment_method: string (nullable = true)

root
 |-- employee_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- position: string (nullable = true)
 |-- email: string (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- yoe: integer (nullable = true)



In [0]:
df_employees = df_employees.withColumn(
    "employee_category",
    when(col("yoe") > 5, "Senior")
    .when((col("yoe") > 3) & (col("yoe") <= 5), "Mid-Level")
    .when((col("yoe") > 0) & (col("yoe") <= 3), "Junior")
    .otherwise("Regular")
)

In [0]:
df_sales = df_sales.withColumnRenamed("amount", "price")

In [0]:
df_sales = df_sales.withColumn(
    "quantity", 
    (floor(rand() * 12) + 1).cast("int")
)

In [0]:
df_sales = df_sales.select(
    "sale_id",
    "customer_id",
    "employee_id",
    "product",
    "category",
    "price",
    "quantity",
    "sale_date",
    "payment_method"
)

In [0]:
df_sales = df_sales.withColumn(
    "total",
    round(col("quantity") * col("price"), 2)
)

In [0]:
df_sales = df_sales.select(
    "sale_id",
    "customer_id",
    "employee_id",
    "product",
    "category",
    "price",
    "quantity",
    "total",
    "sale_date",
    "payment_method"
)

In [0]:
df_customers.write.mode("overwrite").format("delta").save("/Volumes/workspace/default/etl_path/silver_layer/customers")
df_employees.write.mode("overwrite").format("delta").save("/Volumes/workspace/default/etl_path/silver_layer/employees")
df_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .save("/Volumes/workspace/default/etl_path/silver_layer/sales")