#Address Data

## Process the Addresses Data -- Using Python Implementation
- Ingest the data into the data lakehouse - bronze address
- Perform the data quality checks and transform the data as required - silver_address_clean
- Apply the changes to the address data - silver_address_scd1 

###0. Importing the modules

In [0]:
import dlt
import spark.sql.functions as F

[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
File [0;32m<command-8662675776394380>, line 1[0m
[0;32m----> 1[0m [38;5;28;01mimport[39;00m [38;5;21;01mdlt[39;00m

[0;31mModuleNotFoundError[0m: No module named 'dlt'

###1. Ingest the data into the data lakehouse - Bronze Address

In [0]:
'''
# CREATE OR REFRESH STREAMING TABLE ecommercedataset.bronzelayer.customersbronzetable
# COMMENT 'Raw Addresses data ingested from the source system operational data'
# TBLPROPERTIES ('quality' = 'bronze')
# AS 
# SELECT *
# FROM STREAM cloud_files(
#     '/Volumes/ecommercedataset/landinglayer/landingdata/',
#     'json'
#   );
'''

# Pyspark version
@dlt.table(
  name = 'AddressesBronzeTable', 
  comment = 'Raw Addresses data ingested from the source system operational data', 
  table_properties = {'quality': 'bronze'}
)
def CreateAddressesBronzeTable():
  return 
  (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "csv")
         .option("cloudFiles.inferColumnTypes", "true"), 
         .load("/Volumes/ecommercedataset/landinglayer/landingdata/Addresses")
         .select(
           "*", 
           F.col("_metadata.file_path").alias("input_file_path"), 
           F.current_timestamp().alias("ingest_timestamp")
         )
  )


message
"This Delta Live Tables query is syntactically valid, but you must create a pipeline in order to define and populate your table."


###2. Specifying the Expectations for the addresses table 
> Silver addresses Clean (DQ Rules)
1. Fail if customer_id is NULL
2. Drop records with address_line_1 as NULL
3. Warn if postcode is not 5 digits 

> Silver addresses Clean (Transformations)
1. CAST date_of_birth to DATE

- Silver_Addresses is a TYPE_2 Dimension table with primary key 
- as customer_id. Apply the changes to the addresses data based on the 
- created_date and create history for each address added for the customer

In [0]:
'''
%sql
CREATE OR REFRESH STREAMING TABLE ecommercedataset.silverlayer.AddressesSilverClean
-- expectations
(
  -- Mention the Expectations here
  CONSTRAINT valid_customer_id EXPECT(CUSTOMER_ID IS NOT NULL) ON VIOLATION FAIL UPDATE, 
  CONSTRAINT valid_address EXPECT(ADDRESS_LINE_1 IS NOT NULL) ON VIOLATION DROP ROW, 
  CONSTRAINT valid_postcode EXPECT(LENGTH(POSTCODE) = 5)
)
-- comment 
COMMENT 'Table created with Expectations and Data quality checks'
TBLPROPERTIES ('quality', 'silver')
AS 
SELECT
  customer_id, 
  address_line_1, 
  city, 
  state, 
  postcode, 
  CAST(created_date AS DATE) AS CREATED_DATE  
FROM STREAM(LIVE.ecommercedataset.bronzelayer.AddressesBronze);
'''

# Pyspark Code 
@dlt.table(
    # Specifying the properties of the table =
    name = 'CustomerSilverClean', 
    comment = 'Table created with Expectations and Data quality checks', 
    table_properties  = {'quality': 'silver'}
)
# Sepcifying the expectations 
@expect_or_fail("valid_customer_id", "customer_id is not null") # expect or fail
@expect_or_drop("valid_address", "address_line_1 is not null") # expect or drop
@expect("valid_postcode", "LENGTH(postcode) = 5") # Expect or want
def CustomerSilverClean():
    return 
    (
        spark.readStream("LIVE.ecommercedataset.bronzeLayer.AddressesBronze")
             .select(
                 "customer_id", 
                 "address_line_1", 
                 "city", 
                 "state",
                 "postcode", 
                 "country", 
                 F.col("created_date").cast("date")
             )
    )

###3. Apply changes to the Customers data - Silver Customer

In [0]:
# Since we need to create SCD we first need to create streaming table 
'''
# SQL 
    CREATE OR REFRESH STREAMING TABLE ecommercedataset.silverlayer.AddressesSilverClean
    COMMENT 'SCD Type 1 Customers Data'
    TBLPROPERTIES ('quality' = 'silver');
'''

# pyspark
dlt.create_streaming_table(
    name = "ecommercedataset.silverlayer.AddressesSilver", 
    comment = "SCD Type 2 Addresses Data"
    table_properties = {"quality", "silver"}
)

In [0]:
# Applying changes to the streaming table created
'''
# sql
APPLY CHANGES INTO LIVE(ecommercedataset.silverlayer.AddressesSilver)
FROM STREAM(LIVE.AddressesSilverClean)
KEYS (customer_id)
SEQUENCE BY created_date
STORED AS SCD TYPE 1; 
'''
@dlt.apply_changes(
    target = "ecommercedataset.silverlayer.AddressesSilver", 
    source = "AddressesSilverClean"
    keys = ["customer_id"], 
    sequence_by = "created_date", 
    stored_as_scd_type = 2
)