In [0]:
#importing necessary packages
from pyspark.sql import*
from pyspark.sql.functions import*
from pyspark.sql.types import*

In [0]:
spark.conf.set(
    "fs.azure.account.key.genlake9.dfs.core.windows.net",
    "pTd4HMlTeClPXiumr5kHWQRqooZC6G9EH41S17a8ear4sdCJHzOPwinqW717CbUFdPVsVzkEwwFF+ASt8Vv+nw==")

In [0]:
#setting up the paths of both bronze and silver data containers 
bronze_path="abfss://bronze@genlake9.dfs.core.windows.net/"
silver_path="abfss://silver@genlake9.dfs.core.windows.net/"

In [0]:
# Load Bronze Data
salary_bronze = spark.read.parquet(f"{bronze_path}salary_data_bronze.parquet")
employee_bronze = spark.read.parquet(f"{bronze_path}employee_data_bronze.parquet")
department_bronze = spark.read.parquet(f"{bronze_path}department_data_bronze.parquet")

In [0]:
# Typecast join keys in salary_bronze
salary_bronze = salary_bronze \
    .withColumn("employee_id", col("employee_id").cast(IntegerType())) \
    .withColumn("department_id", col("department_id").cast(IntegerType()))

# Typecast join keys in employee_bronze
employee_bronze = employee_bronze \
    .withColumn("employee_id", col("employee_id").cast(IntegerType()))

# Typecast join keys in department_bronze
department_bronze = department_bronze \
    .withColumn("department_id", col("department_id").cast(IntegerType()))


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Define windowSpec FIRST, outside of filter
windowSpec = Window.partitionBy("employee_name").orderBy(col("salary_date").desc())

# Then apply the filter properly
salary_joined = salary_bronze.alias("s") \
    .join(employee_bronze.alias("e"), col("s.employee_id") == col("e.employee_id"), "left") \
    .join(department_bronze.alias("d"), col("s.department_id") == col("d.department_id"), "left") \
    .select(
        col("s.salary_id").cast(IntegerType()).alias("salary_id"),
        col("e.employee_name").cast(StringType()).alias("employee_name"),
        col("e.job_title").cast(StringType()).alias("job_title"),
        col("d.department_name").cast(StringType()).alias("department_name"),
        col("s.salary_amount").cast(FloatType()).alias("salary_amount"),
        col("s.salary_amount").cast(FloatType()).alias("total_salary_value"),
        col("s.salary_date").cast(DateType()).alias("salary_date"),
        current_date().alias("processed_date")
    ) \
    .filter(
        (col("s.salary_amount").isNotNull()) &
        (col("e.job_title").isNotNull()) &
        (col("d.department_name").isNotNull())
    )

# Now create silver layer
salary_silver = salary_joined \
    .withColumn("row_num", row_number().over(windowSpec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")


In [0]:
salary_silver.display()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:466)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:757)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:508)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:613)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:636)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

In [0]:
salary_silver.show()
salary_silver.printSchema()
print("Count:", salary_silver.count())


+---------+--------------+---------+---------------+-------------+------------------+-----------+--------------+
|salary_id| employee_name|job_title|department_name|salary_amount|total_salary_value|salary_date|processed_date|
+---------+--------------+---------+---------------+-------------+------------------+-----------+--------------+
|   166085|Employee 10000|  Support|  Department 22|     76747.46|          76747.46| 2023-10-24|    2025-04-05|
|     6139|Employee 10001|  Manager|   Department 3|     82605.91|          82605.91| 2023-09-09|    2025-04-05|
|   132045|Employee 10003|Developer|  Department 42|     48891.18|          48891.18| 2023-12-15|    2025-04-05|
|   108407|Employee 10009|  Manager|  Department 38|    107554.92|         107554.92| 2023-11-01|    2025-04-05|
|    65905|Employee 10014|Developer|  Department 22|      90311.7|           90311.7| 2023-11-22|    2025-04-05|
|    64111|Employee 10017|  Support|  Department 30|     31186.87|          31186.87| 2023-11-18

In [0]:
print("Bronze path:", bronze_path)


Bronze path: abfss://bronze@genlake9.dfs.core.windows.net/


In [0]:
# Step 1: Read CSV files (Bronze Layer)
employee_bronze = spark.read.parquet(f"{bronze_path}employee_data_bronze.parquet")
department_bronze = spark.read.parquet(f"{bronze_path}department_data_bronze.parquet")

# Step 2: Select needed columns WITH IDs (for deduplication)
employee_silver_temp = employee_bronze.select("employee_id", "employee_name", "job_title")
department_silver_temp = department_bronze.select("department_id", "department_name")

# Step 3: Deduplicate
employee_silver_temp = employee_silver_temp.dropDuplicates(["employee_id"])
department_silver_temp = department_silver_temp.dropDuplicates(["department_id"])

# Step 4: Drop ID columns for final Silver table
employee_silver = employee_bronze.select(col("employee_name"), col("job_title"))
department_silver = department_bronze.select(col("department_name"))

# Step 5: Write Silver Layer to Parquet files
employee_silver.write.mode("overwrite").parquet(f"{silver_path}employee_silver.parquet")
department_silver.write.mode("overwrite").parquet(f"{silver_path}department_silver.parquet")
salary_silver.write.mode("overwrite").parquet(f"{silver_path}salary_silver.parquet")

print("Bronze to Silver transformation completed. Parquet files written.")


Bronze to Silver transformation completed. Parquet files written.


In [0]:
employee_silver.display()
department_silver.display()

employee_name,job_title
Employee 1,Manager
Employee 2,Manager
Employee 3,HR
Employee 4,Analyst
Employee 5,Support
Employee 6,Developer
Employee 7,Analyst
Employee 8,Developer
Employee 9,Support
Employee 10,Support


department_name
Department 1
Department 2
Department 3
Department 4
Department 5
Department 6
Department 7
Department 8
Department 9
Department 10


In [0]:
jdbc_url = "jdbc:mysql://vaishnavi9.mysql.database.azure.com:3306/silver_db"
connection_properties = {
    "user": "vaishnavi9", 
    "password": "Autofill@9",
    "driver": "com.mysql.cj.jdbc.Driver"
}


In [0]:
salary_silver.show()
salary_silver.printSchema()

+---------+--------------+---------+---------------+-------------+------------------+-----------+--------------+
|salary_id| employee_name|job_title|department_name|salary_amount|total_salary_value|salary_date|processed_date|
+---------+--------------+---------+---------------+-------------+------------------+-----------+--------------+
|   166085|Employee 10000|  Support|  Department 22|     76747.46|          76747.46| 2023-10-24|    2025-04-05|
|     6139|Employee 10001|  Manager|   Department 3|     82605.91|          82605.91| 2023-09-09|    2025-04-05|
|   132045|Employee 10003|Developer|  Department 42|     48891.18|          48891.18| 2023-12-15|    2025-04-05|
|   108407|Employee 10009|  Manager|  Department 38|    107554.92|         107554.92| 2023-11-01|    2025-04-05|
|    65905|Employee 10014|Developer|  Department 22|      90311.7|           90311.7| 2023-11-22|    2025-04-05|
|    64111|Employee 10017|  Support|  Department 30|     31186.87|          31186.87| 2023-11-18

In [0]:
salary_silver.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://vaishnavi9.mysql.database.azure.com:3306/silver_db") \
    .option("dbtable", "salary_silver") \
    .option("user", "vaishnavi9") \
    .option("password", "Autofill@9") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Write employee_silver table
employee_silver \
    .repartition(10) \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://vaishnavi9.mysql.database.azure.com:3306/silver_db?useSSL=true&requireSSL=true") \
    .option("dbtable", "employee_silver") \
    .option("user", "vaishnavi9") \
    .option("password", "Autofill@9") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

In [0]:
# Write department_silver table
department_silver.write \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "department_silver") \
    .option("user", "vaishnavi9") \
    .option("password", "Autofill@9") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()

print(" Employee and Department tables written successfully to silver_db.")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum, avg, current_date
from pyspark.sql.types import FloatType


In [0]:
spark = SparkSession.builder \
    .appName("Gold Layer Processing") \
    .getOrCreate()


In [0]:
# Step 2: MySQL connection properties
jdbc_url = "jdbc:mysql://vaishnavi9.mysql.database.azure.com:3306/silver_db"
gold_jdbc_url = "jdbc:mysql://vaishnavi9.mysql.database.azure.com:3306/gold_db"

connection_properties = {
    "user": "vaishnavi9",
    "password": "Autofill@9",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [0]:
# Step 3: Read silver_salary from MySQL
salary_silver = spark.read.jdbc(
    url=jdbc_url,
    table="salary_silver",
    properties=connection_properties
)

In [0]:
# Step 4: Perform aggregations for gold layer
salary_gold = salary_silver.groupBy("department_name") \
    .agg(
        _sum("total_salary_value").cast(FloatType()).alias("total_salary_value"),
        avg("salary_amount").cast(FloatType()).alias("avg_salary")
    ) \
    .withColumn("report_date", current_date())


In [0]:
# Step 5: Write Gold table to MySQL (gold_db.salary_gold)
salary_gold.write \
    .format("jdbc") \
    .option("url", gold_jdbc_url) \
    .option("dbtable", "salary_gold") \
    .option("user", "vaishnavi9") \
    .option("password", "Autofill@9") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .mode("overwrite") \
    .save()
print(" Gold layer table 'salary_gold' created successfully in gold_db.")

 Gold layer table 'salary_gold' created successfully in gold_db.
