### Purpose
1- Read data from bronze  
2- Clean data and avoid duplications  
3- Adapt some data format or structure if needed (We rename a column)  
4- Save the data processed in a delta table, and save the worng data on another data table for a data analyst.   
5- Check version and maintain delta table  

In [None]:
# Get notebook parameter from Azure pipeline
dbutils.widgets.text("_pipeline_run_id","0478ce36-b895-48a0-8a08-1b10430247ca")
dbutils.widgets.text("_processing_date","21-05-2024")
_pipeline_run_id = dbutils.widgets.get("_pipeline_run_id")
bronze_processing_date = dbutils.widgets.get("_processing_date")
print (_pipeline_run_id)
print(bronze_processing_date)

In [None]:
# Configure my account key and account name so Databricks can access the Data Lake
accountName = dbutils.secrets.get("dataLakeScope","accountName")
accountKey = dbutils.secrets.get("dataLakeScope","accountKey")
sparkProperty = f'fs.azure.account.key.{accountName}.dfs.core.windows.net'
spark.conf.set(sparkProperty,accountKey)

In [None]:
# Define the location of my files
bronzeSource = f'abfss://bronze@{accountName}.dfs.core.windows.net/nybabynames'
silverTarget = f'abfss://silver@{accountName}.dfs.core.windows.net/nybabynames'
silverErrors = f'abfss://silver@{accountName}.dfs.core.windows.net/nybabynameserrors'

bronze_table_name =  "bronze.new_york_baby_names"
silver_table_name =  "silver.new_york_baby_names"
silver_errors_table_name =  "silver.new_york_baby_names_errors"

In [None]:
# Read data from Data Lake
from pyspark.sql.functions import *

# retrieve the data that has been added today. 
condition = f"to_date(_processing_date) == to_timestamp('{bronze_processing_date}', 'dd-MM-yyyy')"
gridDataBronze = spark.read.table(bronze_table_name).filter(condition)

display(gridDataBronze.printSchema)



In [None]:
from  pyspark.sql.functions import *

# Correct data structure and add metadata

# 1. Rename count column 
gridDataBronze = gridDataBronze.withColumnRenamed("name_count", "count")

display(gridDataBronze.printSchema)

In [None]:
from  pyspark.sql.functions import *
from  pyspark.sql import *

#  Data Quality
gridCleanDF = gridDataBronze.filter("year IS NOT NULL AND first_name IS NOT NULL AND county IS NOT NULL AND sex IS NOT NULL AND count IS NOT NULL AND count > 0")


# Data Duplication 
# Window Specification: Define a window specification to partition the data by "year", "first_name", "county", and "sex", and order each partition by "_input_file_modification_date" in descending order.
gridDataWindowSpec = Window.partitionBy("year","first_name","county","sex").orderBy(col("_input_file_modification_date").desc(),"count")
# Row Number: Add a row number to each row within its partition using the row_number function.Filter Duplicates: Filter the DataFrame to keep only the rows where the row number is 1, effectively keeping the latest record within each partition.
findLatestDF = gridCleanDF.withColumn("row_number",row_number().over(gridDataWindowSpec)).filter("row_number == 1").drop("row_number")

# Wrong data detected
gridDataErrorDF = gridDataBronze.subtract(findLatestDF)

gridDataDf = findLatestDF

In [None]:
# Wrong data is saved on a delta table, then a data analist could check it
from delta.tables import *

# check if the silver contain the delta table for wrong data
if(DeltaTable.isDeltaTable(spark, silverErrors)): 
    # If yes, add data with the existing delta table
    gridDataErrorDF.write.mode("append").format("delta").save(silverErrors)
else:

    # If no, save the the data 
    gridDataErrorDF.write.mode("overwrite").format("delta").save(silverErrors)

In [None]:
from delta.tables import *

# check if the silver contain the delta table
if(DeltaTable.isDeltaTable(spark, silverTarget)): 

    # If yes, merge data with the existing delta table
    DeltaTable.forPath(spark, silverTarget).alias("target").merge(
        source = gridDataDf.alias("src"),
        condition = "target.year = src.year and target.first_name = src.first_name and target.county = src.county and target.sex = src.sex"
    ).whenMatchedUpdate(
        condition = "target._input_file_modification_date < src._input_file_modification_date",
        set = {
            "count" : "src.count",
            "_processing_date" : "src._processing_date",
            "_pipeline_run_id" : "src._pipeline_run_id",
            "_input_filename" : "src._input_filename",
            "_input_file_modification_date" : "src._input_file_modification_date"
        }
    ).whenNotMatchedInsertAll().execute()
else:

    # If no, save the data to silver
    gridDataDf.write.mode("overwrite").format("delta").save(silverTarget)

In [None]:
# create the schema and table, if required

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS {silver_table_name} USING delta LOCATION '{silverTarget}'")

# Note: Using spark.sql because we can use f-string to retrieve the silver

In [None]:
%sql
-- This is not necessary from a pipeline perspective; it involves checking table information as a learning experience.

DESCRIBE EXTENDED silver.new_york_baby_names

-- Location: stored in the storage account
-- Provider (format): Delta

In [None]:
%sql
-- This is not necessary from a pipeline perspective; it involves showing the transaction log on the delta version as a learning experience.

SELECT version, operationMetrics, operationMetrics.numOutputRows, operationMetrics.numTargetRowsInserted, operationMetrics.numTargetRowsUpdated, operationMetrics.numTargetRowsDeleted
FROM (DESCRIBE HISTORY silver.new_york_baby_names)

In [None]:
%sql

-- Check your result for testing. Do not do this in production!
-- SELECT first_name, sum(count) as cnt
-- FROM silver.new_york_baby_names
-- GROUP BY (first_name)
-- ORDER BY cnt DESC
-- LIMIT 10





In [None]:
#  Maintenance for Data Table

# To optimized the performance of the Delta Table, we need to execute 2 commands:
# 1. optimize(): Optimize the number of files used to store the data.
# 2. vacuum(): remove the ild version of the data. It reduce the overhead but it limites the version we can go back to.

# Databricks recommends frequently running the OPTIMIZE command to compact small files.
# This operation does not remove the old files. To remove them, run the VACUUM command (https://learn.microsoft.com/en-us/azure/databricks/delta/vacuum).
# https://learn.microsoft.com/en-us/azure/databricks/delta/best-practices#--compact-files

# In azure we could do predictive optimization (https://learn.microsoft.com/en-us/azure/databricks/optimizations/predictive-optimization#what-operations-does-predictive-optimization-run), it have prerequisites, like a premium plan and managed tables(https://learn.microsoft.com/en-us/azure/databricks/optimizations/predictive-optimization#prerequisites-for-predictive-optimization)

gridDataDelta = DeltaTable.forName(spark, silver_table_name)

# In this example, we will run and vacuum every 30 days
if gridDataDelta.history(30).filter("operation = 'VACUUM START'").count() == 0:
      gridDataDelta.optimize()
      gridDataDelta.vacuum() # default = 7 days

if(DeltaTable.isDeltaTable(spark, silver_errors_table_name)): 
  gridDataDelta = DeltaTable.forName(spark, silver_errors_table_name)
  # In this example, we will run and vacuum every 30 days
  if gridDataDelta.history(30).filter("operation = 'VACUUM START'").count() == 0:
      gridDataDelta.optimize()
      gridDataDelta.vacuum() # default = 7 days