In [0]:
# Remove all widgets and clear variables
dbutils.widgets.removeAll()

# Clear the variables from memory
try:
    del storage_account_name, container_name, sas_token, environment
except:
    pass

print("Widgets and variables cleared!")

In [0]:
# Create widgets for parameters (these can be passed from Azure DevOps or Jobs)
dbutils.widgets.text("storage_account_name", "shared5005", "Storage Account Name")
dbutils.widgets.text("container_name", "emp", "Container Name")
dbutils.widgets.text("environment", "dev", "Environment (dev/stage/prod)")

# Get parameter values
storage_account_name = dbutils.widgets.get("storage_account_name")
container_name = dbutils.widgets.get("container_name")
environment = dbutils.widgets.get("environment")

storage_account_key = dbutils.secrets.get(scope="storage-secrets", key="storage-account-key")
print(storage_account_key)
# Configure Azure Storage Access Key
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    storage_account_key
)

print(f"Environments: {environment}")
print(f"Storage access configured for: {storage_account_name}")

In [0]:
# Import required libraries
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Define ABFS paths using parameters
raw_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/raw/employee/*/*.csv"
bronze_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/bronze/employee"
table_name = f"employee_bronze_{environment}"  # Environment-specific table name

print(f"Raw path: {raw_path}")
print(f"Bronze path: {bronze_path}")
print(f"Table name: {table_name}")

# Read CSV from raw folder
print("Reading CSV files from raw/employee folder...")
df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("recursiveFileLookup", "true") \
    .load(raw_path)

print(f"Total records read: {df_raw.count()}")
df_raw.show(5)

In [0]:
# Deduplicate based on EmployeeID, keeping the latest record by LastModifiedDate
print("Deduplicating records...")
window_spec = Window.partitionBy("EmployeeID").orderBy(desc("LastModifiedDate"))
df_deduped = df_raw.withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")
print(f"Records after deduplication: {df_deduped.count()}")

In [0]:
# Write to bronze folder as Delta format
print("Writing to bronze/employee folder as Delta...")
df_deduped.write.format("delta") \
    .mode("overwrite") \
    .save(bronze_path)

In [0]:
# Create or replace table
print(f"Creating table {table_name}...")
spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{bronze_path}'")

print("Process completed successfully!")

In [0]:
%%sql

select count(*) from employee_bronze_dev