## to Gold Layer
Delta Live Tables (DLT) transformations for reading, transforming, and saving data from Delta files stored in Azure Data Lake Storage (ADLS) Gen2. The purpose of each cell is as follows:

In [0]:
#  imports necessary libraries and functions for use in the subsequent cells.
import dlt
from pyspark.sql.functions import col, when, count
from pyspark.sql.types import IntegerType

In [0]:
# Paths for one table (Netflix cast)
silver_path_netflix_cast = "abfss://silver@storageforproject.dfs.core.windows.net/netflix_cast"

@dlt.table(comment="Read Delta files from silver container (batch)")
def netflix_cast():
    return spark.read.format("delta").load(silver_path_netflix_cast)

@dlt.table(comment="Transforms the data by removing duplicate rows")
@dlt.expect("valid_show_id", "show_id IS NOT NULL")
def netflix_cast_transformed():
    df = dlt.read("netflix_cast").dropDuplicates()
    return df

In [0]:
silver_path_netflix_category = "abfss://silver@storageforproject.dfs.core.windows.net/netflix_category"


@dlt.table(comment="Read Delta files from silver container (batch)")
def netflix_category():
    df = spark.read.format("delta").load(silver_path_netflix_category)
    return df

@dlt.table(comment="Transforms the data by removing duplicate rows and ensures that the show_id column is not null. ")
@dlt.expect("valid_show_id", "show_id IS NOT NULL")

def netflix_category_transformed():
    return(
        dlt.read("netflix_category").dropDuplicates()
    )

In [0]:
silver_path_netflix_countries = "abfss://silver@storageforproject.dfs.core.windows.net/netflix_countries"

@dlt.table(comment="Read netflix_countries files from silver container (batch)")

def netflix_countries():
    return (spark.read.format("delta").load(silver_path_netflix_countries))

@dlt.table(comment="Transforms the data by removing duplicate rows and ensures that the show_id column is not null. ")
@dlt.expect("valid_show_id", "show_id IS NOT NULL")

def netflix_countries_transformed():

    return(
        dlt.read("netflix_countries").dropDuplicates()
    )

In [0]:
silver_path_netflix_directors = "abfss://silver@storageforproject.dfs.core.windows.net/netflix_directors"

@dlt.table(comment="Read netflix_directors files from silver container (batch)")

def netflix_directors():
    return(spark.read.format("delta").load(silver_path_netflix_directors))

@dlt.table(comment="Transforms the data by removing duplicate rows and ensures that the show_id column is not null. ")
@dlt.expect("valid_show_id", "show_id IS NOT NULL")

def netflix_directors_transformed():
    return(
        dlt.read("netflix_directors").dropDuplicates()
    )

If the Silver data is static and only updated occasionally, `batch` reading makes sense. Batch reads are straightforward and efficient for one-time processing. However, if the Silver layer is continuously being updated with new data (like in a real-time pipeline), `streaming` would be better to process those updates incrementally.

In [0]:
silver_path_netflix_titles = "abfss://silver@storageforproject.dfs.core.windows.net/netflix_titles"

@dlt.table(comment="read netflix_titles files from silver container(stream)")

def stream_netflix_titles():
      return (
            spark.readStream.format("delta").load(silver_path_netflix_titles)
      )

In [0]:
# DLT expectations are for validating data, not transforming it.
# EXPECTATION RULES (DATA QUALITY CHECKS)
expectation_rules = {
    "valid_show_id": "show_id IS NOT NULL",          # Drop rows with null show_id
    "valid_release_year": "release_year IS NOT NULL",# Drop rows with null release_year
}

In [0]:
@dlt.table(comment="cleaned netflix_titles table ")
@dlt.expect_all_or_drop(expectation_rules)

def clean_netflix_titles():
    df = dlt.read("stream_netflix_titles").dropDuplicates()\
        .withColumn("description", when(col("description").isNull(), "No description").otherwise(col("description")))\
        .withColumn("release_year",col("release_year").cast(IntegerType()))\
        .withColumn("new_flag",when(col("type")=="Movie",1).otherwise(0))
    return df

when reading from a Delta table that's part of the same pipeline, using dlt.read is better for lineage and dependency management.

In [0]:
@dlt.view(comment="create a view")
@dlt.expect("valid_release_year","release_year IS NOT NULL")

def netflix_title_view():
    return(
        dlt.read("clean_netflix_titles")
        .withColumn("new_flag",when(col("type")=="Movie",1).otherwise(0))
    
    )

In [0]:
@dlt.table(comment="final gold table")
@dlt.expect("valid_show_id","show_id IS NOT NULL")
@dlt.expect_or_drop("valid date_added","date_added IS NOT NULL")


def netflix_titles_gold():
    return(
        dlt.read("clean_netflix_titles").dropDuplicates()
        )
