In [0]:
%run "/Workspace/src (1)/assignment_1/source_to_bronze/util.ipynb"

In [0]:
from pyspark.sql.types import *

employee_schema = StructType([
    StructField("EmployeeID", StringType(), True),
    StructField("EmpName", StringType(), True),
    StructField("DepartmentID", StringType(), True),
    StructField("CountryID", StringType(), True),
    StructField("Salary", DoubleType(), True),
    StructField("Age", IntegerType(), True)
])

In [0]:

#Read using custom schema (Method 1 – Function from utils)
employee_df_1 = read_csv_with_schema("/Volumes/different_files_format/default/data/csv_files/Employee-Q1.csv", employee_schema)

# Read using Spark API directly (Method 2 – Direct read)
employee_df_2 = spark.read.option("header", True).schema(employee_schema).csv("/Volumes/different_files_format/default/data/csv_files/Employee-Q1.csv"

# Read using SparkSession + Options (Method 3 – More detailed)
employee_df_3 = (
    spark.read
    .format("csv")
    .option("header", "true")
    .schema(employee_schema)
    .load("/Volumes/different_files_format/default/data/csv_files/Employee-Q1.csv")
)

# results
display(employee_df_1)
display(employee_df_2)
display(employee_df_3)

In [0]:
from pyspark.sql.functions import udf
import re

# CamelCase to snake_case converter
def camel_to_snake(name):
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

# UDF to apply CamelCase → snake_case
def camel_to_snake_func(col_name):
    return camel_to_snake(col_name)

camel_to_snake_udf = udf(camel_to_snake_func, StringType())

In [0]:
old_columns = employee_df_1.columns

# Convert using UDF logic
new_columns = [camel_to_snake(col) for col in old_columns]  # from utils, not UDF inside DataFrame

# Apply renaming
for old_col, new_col in zip(old_columns, new_columns):
    employee_df_1 = employee_df_1.withColumnRenamed(old_col, new_col)

display(employee_df_1)

In [0]:
from pyspark.sql.functions import current_date

employee_df_1 = employee_df_1.withColumn("load_date", current_date())

spark.sql("CREATE DATABASE IF NOT EXISTS Employee_info")

employee_df_1.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/Volumes/workspace/default/databricks_assignment/employee_info/dim_employee")

In [0]:
df = spark.read.format("delta").load("/Volumes/workspace/default/databricks_assignment/employee_info/dim_employee")
display(df)