In [0]:
from delta.tables import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, TimestampType, FloatType

In [0]:
STORAGE_ACCOUNT="*****************************"
ADLS_KEY="************************************"
BRONZE_LAYER_NAMESPACE="bronze"
SCRATCH_LAYER_NAMESPACE="scratch"
STORE_SALES_FOLDER="sales"
TABLE="store_orders"
ADLS_FOLDER="2021/04/28/18"
BRONZE_TABLE_PATH="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/\[dbo\].\["+TABLE+"\]/"+ADLS_FOLDER
spark.conf.set("fs.azure.account.key."+STORAGE_ACCOUNT+".blob.core.windows.net", ADLS_KEY)


In [0]:
ORDERS_SCHEMA =[
    ('order_number', IntegerType()),
    ('customer_id', IntegerType()),
    ('product_id', IntegerType()),
    ('order_date', StringType()),
    ('units', IntegerType()),    ('sale_price', FloatType()),
    ('currency', StringType()),
    ('order_mode', StringType()),
    ('updated_at', TimestampType())
]
fields = [StructField(*field) for field in ORDERS_SCHEMA]
schema = StructType(fields)
df_read_data_hour1 = spark.read.csv(BRONZE_TABLE_PATH, schema=schema )
display(df_read_data_hour1)


In [0]:
%sql
DROP TABLE store_orders

In [0]:
DELTA_TABLE_WRITE_PATH="wasbs://"+SCRATCH_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/"+TABLE
PARTITION_COLUMN="currency"
df_read_data_hour1.write.format("delta").option("path", DELTA_TABLE_WRITE_PATH).partitionBy(PARTITION_COLUMN).saveAsTable(TABLE)


In [0]:
display(dbutils.fs.ls(DELTA_TABLE_WRITE_PATH))

In [0]:
spark.read.format("delta").load(DELTA_TABLE_WRITE_PATH).show()

In [0]:
%sql		
SELECT * FROM store_orders;


In [0]:
%sql
DESCRIBE HISTORY store_orders;


In [0]:
%sql
SELECT * FROM store_orders WHERE order_number=5; 

In [0]:
%sql
UPDATE store_orders SET sale_price=90.50 WHERE order_number=5;

In [0]:
%sql
SELECT * FROM store_orders WHERE order_number=5;


In [0]:
%sql
DESCRIBE HISTORY store_orders;


In [0]:
%sql
SELECT * FROM store_orders VERSION AS OF 0 WHERE order_number=5;

In [0]:
%sql
DELETE FROM store_orders WHERE order_number=5;
SELECT * FROM store_orders WHERE order_number=5;


In [0]:
%sql
DESCRIBE HISTORY store_orders;

In [0]:
%sql
RESTORE TABLE store_orders TO VERSION AS OF 1;
SELECT * FROM store_orders VERSION AS OF 0 WHERE order_number=5; 

In [0]:
%sql
DESCRIBE HISTORY store_orders;


In [0]:
%sql
SELECT count(*) FROM store_orders;

In [0]:
ADLS_FOLDER="2021/04/28/19"
BRONZE_TABLE_PATH="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/\[dbo\].\["+TABLE+"\]/"+ADLS_FOLDER
df_read_data_hour2 = spark.read.csv(BRONZE_TABLE_PATH, schema=schema )
display(df_read_data_hour2)


In [0]:
deltaTable = DeltaTable.forPath(spark, DELTA_TABLE_WRITE_PATH)
deltaTable.alias("store_orders").merge(
    df_read_data_hour2.alias("store_orders_new"),
                    "store_orders.order_number = store_orders_new.order_number")                     \
                    .whenMatchedUpdate(set = {"order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at" } )  \
                    .whenNotMatchedInsert(values =                                                   \
                       {                                                    
                                              "order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at"      \
                       }                                                                             \
                     ).execute()

In [0]:
%sql
SELECT count(*) FROM store_orders;


In [0]:
%sql
DESCRIBE HISTORY store_orders;


In [0]:
ADLS_FOLDER="2021/04/28/20"
BRONZE_TABLE_PATH="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/\[dbo\].\["+TABLE+"\]/"+ADLS_FOLDER
df_read_data_hour3 = spark.read.csv(BRONZE_TABLE_PATH, schema=schema )
display(df_read_data_hour3)

In [0]:
%sql
SELECT * FROM store_orders WHERE order_number IN (500, 1254, 1501, 2234, 2345);

In [0]:
deltaTable = DeltaTable.forPath(spark, DELTA_TABLE_WRITE_PATH)
deltaTable.alias("store_orders").merge(
    df_read_data_hour3.alias("store_orders_new"),
                    "store_orders.order_number = store_orders_new.order_number")                     \
                    .whenMatchedUpdate(set = {"order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at" } )  \
                    .whenNotMatchedInsert(values =                                                   \
                       {                                                    
                                              "order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at"      \
                       }                                                                             \
                     ).execute()

In [0]:
%sql
SELECT * FROM store_orders WHERE order_number IN (500, 1254, 1501, 2234, 2345);

In [0]:
%sql
SELECT * FROM store_orders WHERE order_number=500;

In [0]:
%sql
DESCRIBE HISTORY store_orders;

In [0]:
%sql
SELECT version, operation, isolationLevel
 from (DESCRIBE HISTORY store_orders);

In [0]:
%sql
ALTER TABLE store_orders SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

In [0]:
%sql
UPDATE store_orders SET sale_price=100.00 WHERE order_number=500;

In [0]:
%sql
SELECT version, operation, isolationLevel
 from (DESCRIBE HISTORY store_orders);
