In [0]:
%run /Workspace/Ncpl/Project7_connection

In [0]:
sql_df=connect_sql("CustomerOrders")

In [0]:
display(sql_df)

### Data Cleaning

#### 1. remove duplicates

In [0]:
df_drop_duplicates=sql_df.drop_duplicates(subset=["customer_id","customer_name"])

In [0]:
display(df_drop_duplicates)

#### 2. drop extra columns

In [0]:
df_select=df_drop_duplicates.select("order_id","customer_id","customer_name","product_id","product_name","category","quantity","price","order_date","status","shipping_address")

#### 3. fill null rows

In [0]:
from pyspark.sql.functions import *
avg_price = float(df_select.agg(avg("price")).collect()[0][0])  # Extracts the scalar value
display(avg_price)

In [0]:
df_fillna=df_select.fillna({"customer_name":"Unknown","price":avg_price,"status":"Shipped","shipping_address":"Unknown"})

In [0]:
display(df_fillna)

#### Filter on status shipped

In [0]:
filtered_df=df_fillna.filter(df_fillna.status=="Shipped")

In [0]:
display(filtered_df)

In [0]:
df_hashkey=filtered_df.withColumn("hash_key",crc32(concat(*filtered_df.columns)))

In [0]:
display(df_hashkey)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS customer_data_shipped_scd1 (
    order_id INT,
    customer_id INT,
    customer_name String,
    product_id INT,
    product_name String,
    category String,
    quantity INT,
    price DECIMAL(10,2),
    order_date DATE,
    status String,
    shipping_address String,
    hashkey bigint,
    createdby string,
    createddate TIMESTAMP,
    updatedby string,
    updateddate TIMESTAMP

) LOCATION '/mnt/deltatables/customer_data_shipped_scd1';

In [0]:
from delta.tables import DeltaTable
deltatbl=DeltaTable.forPath(spark, "/mnt/deltatables/customer_data_shipped_scd1")
deltatbl.toDF().show()

In [0]:
df_src = (
    df_hashkey.alias("src")
    .join(
        deltatbl.toDF().alias("tgt"),
        (col("src.order_id") == col("tgt.order_id")),  # Join only on id
        "left"
    )
    .filter( (col("tgt.order_id").isNull()) | (col("src.hash_key") != col("tgt.hashkey")) )  
    .select("src.*"))


In [0]:
display(df_src)

In [0]:
deltatbl.alias("tgt").merge(df_src.alias("src"), "tgt.order_id = src.order_id") \
    .whenMatchedUpdate(set={
        "tgt.order_id": "src.order_id",
        "tgt.customer_id": "src.customer_id",
        "tgt.customer_name": "src.customer_name",
        "tgt.product_id": "src.product_id",
        "tgt.product_name": "src.product_name",
        "tgt.category": "src.category",
        "tgt.quantity": "src.quantity",
        "tgt.price": "src.price",
        "tgt.order_date": "src.order_date",
        "tgt.status": "src.status",
        "tgt.shipping_address": "src.shipping_address",
        "tgt.hashkey": "src.hash_key",
        "tgt.updatedby":lit('Aniket-updated'),
        "tgt.updateddate":current_timestamp()
        }
        
    ) \
    .whenNotMatchedInsert(values={
        "tgt.order_id": "src.order_id",
        "tgt.customer_id": "src.customer_id",
        "tgt.customer_name": "src.customer_name",
        "tgt.product_id": "src.product_id",
        "tgt.product_name": "src.product_name",
        "tgt.category": "src.category",
        "tgt.quantity": "src.quantity",
        "tgt.price": "src.price",
        "tgt.order_date": "src.order_date",
        "tgt.status": "src.status",
        "tgt.shipping_address": "src.shipping_address",
        "tgt.hashkey": "src.hash_key",
        "tgt.createdby":lit('Aniket'),
        "tgt.createddate":current_timestamp(),
        "tgt.updatedby":lit('Aniket'),
        "tgt.updateddate":current_timestamp()
    }

    ).execute()

In [0]:
%sql
Select * from customer_data_shipped_scd1

In [0]:
%sql
describe history customer_data_shipped_scd1

In [0]:
%sql
Select * from customer_data_shipped_scd1@v7

In [0]:
%sql
SHOW TBLPROPERTIES customer_data_shipped_scd1;