### Process Addresses data in PySpark
1. Ingest the data into the data kalehouse : bronze_addresses
2. perform data quality checks and transform the data as required : silver_addresses_clean
3. Apply changes to the Addresses data (SCD Type 2) : silver_addresses
4. Since in DLT notebooks, we cannont use multiple languages in one notebook, we cannot use magic command

###1. Ingest the data into the data kalehouse : bronze_addresses

In [0]:
import dlt
import pyspark.sql.functions as F

In [0]:

# In python to invoke autoloader, we use .format("cloudFiles")
# In SQL, we use the format "cloudFiles()"

In [0]:
# CREATING DELTA LIVE TABLES in Python
@dlt.table(
    name="bronze_addresses",
    table_properties={'quality' : 'bronze'},
    comment="Raw addresses data ingested from the source system"
)
def bronze_addresses();
  return (
    spark.readStream.format("cloudFiles")
                    .option("cloudFiles.format", "csv")
                    .option("cloudFiles.inferColumnTypes", True)
                    .load("/Volumes/circuitbox/landing/operational_data/addresses/")
                    .select(
                            *,
                            F.col("_metadata.file_path").alias("file_path"), #Adding new column1
                            F.current_timestamp().alias("ingest_timestamp")  #Adding new column2
                    )
  )

###2. Perform data quality checks and transform the data as required : silver_addresses_clean

In [0]:
@dlt.table(
    name="silver_addresses_clean",
    table_properties={'quality' : 'silver'},
    comment="Cleaned addresses data ingested from the bronze_addresses table",
)
@dlt.expect_or_fail("valid_customer_id", "customer_id IS NOT NULL")
@dlt.expect_or_drop("valid_address_line_1", "address_line_1 IS NOT NULL")
@dlt.expect("valid_postcode", "LENGTH(postcode) = 5")
def silver_addresses_clean():
  return (
          spark.readStream.table("bronze_addresses")
               .select(
                     "customer_id",
                     "address_line_1",
                     "city",
                     "state",
                     "postcode",
                     F.col("created_date").cast("date")                     
               )
  )
         

###3. Apply changes to the Addresses data (SCD Type 2) : silver_addresses

##### To Create SCT Type 2, I am using APPLY CHANGES API

In [0]:
# Since APPLY CHANGES API doesnot create table, we need to create it upfront

dlt.create_streaming_table(
     name = "silver_addresses",
     comment = "SCD Type 2 addresses data",
     tbl_properties = {'quality' : 'silver'}
)

In [0]:
# Using APPLY CHANGES API to make the table SCD Type 2

dlt.apply_changes(
    target = "silver_addresses",
    source = "silver_addresses_clean",
    keys = ["customer_id"],
    sequence_by = "created_date",
    stored_as_scd_type_2 = True
)