# Running a DLT Pipeline

## Downloading data

In [0]:
import urllib
import os

my_catalog = "main_catalog"
my_schema = "ny_baby_names"
my_volume = "baby_names_data"

file_path = f"/{my_catalog}/{my_schema}/{my_volume}/"
dbutils.fs.mkdirs(file_path)

download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

local_dbfs_path = f"/dbfs{file_path}"

os.makedirs(local_dbfs_path, exist_ok=True)
print(f"Directory created: {local_dbfs_path}")

download_path = local_dbfs_path + filename
urllib.request.urlretrieve(download_url, download_path)
print(f"File downloaded to: {download_path}")



#

## Pipeline to Declare Materialized Views and Streaming Tables

In [0]:
import dlt
from pyspark.sql.functions import *

my_catalog = spark.conf.get('my_catalog')
my_schema = spark.conf.get('my_schema')
my_volume = spark.conf.get('my_volume')

data_path = f"/{my_catalog}/{my_schema}/{my_volume}/"


# Bronze layer that ingests raw data from a cloud storage location
@dlt.table(
    comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names():
    df = (
        spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'csv')
        .option('inferSchema', True)
        .option('header', True)
        .load(data_path)
    )
    df_renamed_column = df.withColumnRenamed("First Name", "First_Name") # Renaming arg1 to arg2
    return df_renamed_column

# Silver layer with data validation and column renaming
@dlt.table(
    comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")   # Checks if First_Name is not null. The expectations name is "valid_first_name"
@dlt.expect_or_fail("valid_count", "Count > 0")             # Checks if the count of babies is greater than 0. The expectations name is "valid_count"
def baby_names_cleaned():
    return (
        dlt.read('baby_names')
        .withColumnRenamed("Year", "Year_of_Birth")
        .select("Year_of_Birth", "First_Name", "Count")
    )

# Gold layer with data analytics and data transformation.
@dlt.table(
    comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
    return (
        dlt.read('baby_names_cleaned')
        .filter(expr("Year_of_Birth == 2021"))
        .groupBy("First_Name")
        .agg(sum("Count").alias("Total_Count"))
        .sort(expr("Total_Count").desc())
        .limit(10)
    )


In [0]:
# View the bronze layer data
print("Bronze Layer: ")
display(spark.table(f"{my_schema}.baby_names").limit(10))
display(spark.table(f"{my_schema}.baby_names_cleaned").limit(10))
display(spark.table(f"{my_schema}.top_baby_names_2021"))