In [0]:
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,array,ArrayType,DateType,TimestampType
import hashlib
from pyspark.sql.functions import udf

In [0]:
# Get Parameter from Azure Data Factory
#STORAGE_ACCOUNT=dbutils.widgets.get("STORAGE_ACCOUNT")
#ADLS_KEY=dbutils.widgets.get("ADLS_KEY")
#BRONZE_LAYER_NAMESPACE=dbutils.widgets.get("BRONZE_LAYER_NAMESPACE")
#SILVER_LAYER_NAMESPACE=dbutils.widgets.get("SILVER_LAYER_NAMESPACE")
#STORE_SALES_FOLDER=dbutils.widgets.get("STORE_SALES_FOLDER")
#ADLS_FOLDER=dbutils.widgets.get("ADLS_FOLDER")
#TABLE_LIST=dbutils.widgets.get("TABLE_LIST")

STORAGE_ACCOUNT="traininglakehouse"
ADLS_KEY="+QpX+DXYC6qJXcAIXmUsWk+WYhV+43WaqVM+Nxi9SO3UQSJ+VDwXiSJ1AnFmif8KwiQudrKsA80gG+x0A+1BTQ=="
BRONZE_LAYER_NAMESPACE="bronze"
SILVER_LAYER_NAMESPACE="silver"
STORE_SALES_FOLDER="sales"
ADLS_FOLDER="2021/04/28/18"
TABLE_LIST="products,store_customers,store_orders"

In [0]:
# Set the Azure key in the Spark config
spark.conf.set("fs.azure.account.key."+STORAGE_ACCOUNT+".blob.core.windows.net", ADLS_KEY)

In [0]:
CUSTOMERS_SCHEMA =[
    ('customer_id', IntegerType()),
    ('customer_name', StringType()),
    ('address', StringType()),
    ('city', StringType()),
    ('postalcode', StringType()),
    ('country', StringType()),
    ('phone', StringType()),
    ('email', StringType()),
    ('credit_card', StringType()),
    ('updated_at', TimestampType())
]

ORDERS_SCHEMA =[
    ('order_number', IntegerType()),
    ('customer_id', IntegerType()),
    ('product_id', IntegerType()),
    ('order_date', StringType()),
    ('units', IntegerType()),
    ('sale_price', StringType()),
    ('currency', StringType()),
    ('order_mode', StringType()),
    ('updated_at', TimestampType())
]

PRODUCTS_SCHEMA =[
    ('product_code', StringType()),
    ('product_name', StringType()),
    ('product_category', StringType()),
    ('updated_at', TimestampType())
]



In [0]:
def gen_blank_df(spark, schema_struct):
    fields = [StructField(*field) for field in schema_struct]
    schema = StructType(fields)
    df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
    return df

def mask_value(column):
  mask_value = hashlib.sha256(column.encode()).hexdigest()
  return mask_value

mask_udf = udf(mask_value, StringType())

In [0]:
for table in TABLE_LIST.split(","):
  
  table_path="wasbs://"+SILVER_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/"+table
  bronze_table_path="wasbs://"+BRONZE_LAYER_NAMESPACE+"@"+STORAGE_ACCOUNT+".blob.core.windows.net/"+STORE_SALES_FOLDER+"/\[dbo\].\["+table+"\]/"+ADLS_FOLDER
  #print(bronze_table_path)
  #print(table_path)
  
  if table=="store_customers":
    partition_col="country"
    TABLE_SCHEMA=CUSTOMERS_SCHEMA
  elif table=="store_orders":
    partition_col="currency"
    TABLE_SCHEMA=ORDERS_SCHEMA
  elif table=="products":
    partition_col="product_category"
    TABLE_SCHEMA=PRODUCTS_SCHEMA
    
    
  fields = [StructField(*field) for field in TABLE_SCHEMA]
  schema = StructType(fields)
  try:
    deltaTable = DeltaTable.forPath(spark, table_path)
  except:
    spark.sql("DROP TABLE IF EXISTS "+ table)
    df = gen_blank_df(spark,TABLE_SCHEMA)
    df.write.format("delta").option("path", table_path).partitionBy(partition_col).saveAsTable(table)
    deltaTable = DeltaTable.forPath(spark, table_path)
  
  # Read store customers
  df_table_incremental = spark.read.csv(bronze_table_path, schema=schema )
  display(df_table_incremental)
  
  if table=="store_customers":
    #df_store_customers_incremental.show(truncate=False)
    df_table_masked = df_table_incremental.withColumn('phone_masked',mask_udf('phone')).drop('phone').withColumnRenamed('phone_masked', 'phone')
    df_table_masked = df_table_masked.withColumn('credit_card_masked',mask_udf('credit_card')).drop('credit_card').withColumnRenamed('credit_card_masked', 'credit_card')
    df_table_masked = df_table_masked.withColumn('credit_card_masked',mask_udf('credit_card')).drop('credit_card').withColumnRenamed('credit_card_masked', 'credit_card')
    df_table_masked = df_table_masked.withColumn('address_masked',mask_udf('address')).drop('address').withColumnRenamed('address_masked', 'address')
    
    deltaTable.alias("store_customers").merge(
    df_table_masked.alias("store_customers_new"),
                    "store_customers.email = store_customers_new.email") \
                    .whenMatchedUpdate(set = {"customer_id": 	  "store_customers_new.customer_id", 	\
                                              "customer_name":    "store_customers_new.customer_name",  \
                                              "address":          "store_customers_new.address",        \
                                              "city":             "store_customers_new.city",           \
                                              "postalcode":       "store_customers_new.postalcode",     \
                                              "country":          "store_customers_new.country",        \
                                              "phone":            "store_customers_new.phone",          \
                                              "email":            "store_customers_new.email",          \
                                              "credit_card":      "store_customers_new.credit_card",    \
                                              "updated_at":       "store_customers_new.updated_at" } )  \
                    .whenNotMatchedInsert(values =                                                      \
                       {                                                    
                                              "customer_id": 	  "store_customers_new.customer_id", 	\
                                              "customer_name":    "store_customers_new.customer_name",  \
                                              "address":          "store_customers_new.address",        \
                                              "city":             "store_customers_new.city",           \
                                              "postalcode":       "store_customers_new.postalcode",     \
                                              "country":          "store_customers_new.country",        \
                                              "phone":            "store_customers_new.phone",          \
                                              "email":            "store_customers_new.email",          \
                                              "credit_card":      "store_customers_new.credit_card",    \
                                              "updated_at":       "store_customers_new.updated_at"      \
                       }                                                                                \
                     ).execute()
  elif table=="store_orders":
    df_table_masked = df_table_incremental
    deltaTable.alias("store_orders").merge(
    df_table_masked.alias("store_orders_new"),
                    "store_orders.order_number = store_orders_new.order_number")                     \
                    .whenMatchedUpdate(set = {"order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at" } )  \
                    .whenNotMatchedInsert(values =                                                   \
                       {                                                    
                                              "order_number": 	  "store_orders_new.order_number", 	 \
                                              "customer_id":      "store_orders_new.customer_id",    \
                                              "product_id":       "store_orders_new.product_id",     \
                                              "order_date":       "store_orders_new.order_date",     \
                                              "units":            "store_orders_new.units",          \
                                              "sale_price":       "store_orders_new.sale_price",     \
                                              "currency":         "store_orders_new.currency",       \
                                              "order_mode":       "store_orders_new.order_mode",     \
                                              "updated_at":       "store_orders_new.updated_at"      \
                       }                                                                             \
                     ).execute()
    deltaTable.delete("order_mode = 'DELETE'")
  elif table=="products":
    df_table_masked = df_table_incremental
    deltaTable.alias("products").merge(
    df_table_masked.alias("products_new"),
                    "products.product_code = products_new.product_code")                           \
                    .whenMatchedUpdate(set = {"product_code": 	  "products_new.product_code", 	   \
                                              "product_name":     "products_new.product_name",     \
                                              "product_category": "products_new.product_category", \
                                              "updated_at":       "products_new.updated_at" } )    \
                    .whenNotMatchedInsert(values =                                                 \
                       {                                                    
                                              "product_code": 	  "products_new.product_code", 	   \
                                              "product_name":     "products_new.product_name",     \
                                              "product_category": "products_new.product_category", \
                                              "updated_at":       "products_new.updated_at"        \
                       }                                                                           \
                     ).execute()  
  #deltaTable.generate("symlink_format_manifest")
  display(df_table_masked)

product_code,product_name,product_category,updated_at
1001,Watch,Wearables,2021-04-28T17:56:10.393+0000
1002,Vaccum,HomeAppliance,2021-04-28T17:56:10.393+0000
1003,Airconditioner,HomeAppliance,2021-04-28T17:56:10.393+0000
1004,Microwave,HomeAppliance,2021-04-28T17:56:10.393+0000
1005,DVD,HomeEntertainment,2021-04-28T17:56:10.393+0000
1006,TV,HomeEntertainment,2021-04-28T17:56:10.393+0000
1007,Keyboard,Computers,2021-04-28T17:56:10.393+0000
1008,Mice,Computers,2021-04-28T17:56:10.393+0000
1009,Monitor,Computers,2021-04-28T17:56:10.393+0000
1010,Mobile,Phones,2021-04-28T17:56:10.393+0000


product_code,product_name,product_category,updated_at
1001,Watch,Wearables,2021-04-28T17:56:10.393+0000
1002,Vaccum,HomeAppliance,2021-04-28T17:56:10.393+0000
1003,Airconditioner,HomeAppliance,2021-04-28T17:56:10.393+0000
1004,Microwave,HomeAppliance,2021-04-28T17:56:10.393+0000
1005,DVD,HomeEntertainment,2021-04-28T17:56:10.393+0000
1006,TV,HomeEntertainment,2021-04-28T17:56:10.393+0000
1007,Keyboard,Computers,2021-04-28T17:56:10.393+0000
1008,Mice,Computers,2021-04-28T17:56:10.393+0000
1009,Monitor,Computers,2021-04-28T17:56:10.393+0000
1010,Mobile,Phones,2021-04-28T17:56:10.393+0000


customer_id,customer_name,address,city,postalcode,country,phone,email,credit_card,updated_at
1,Ariel Hale,Ap #660-3260 Pellentesque St.,College,98362,United States,1-973-833-9836,amet.metus@Nullatinciduntneque.net,5124442517412973,2021-04-28T17:56:10.453+0000
2,Aubrey Norris,Ap #943-1347 Imperdiet Avenue,Coldstream,D10 5JV,United Kingdom,07672 321093,sollicitudin@enimmitempor.ca,5103696625359419,2021-04-28T17:56:10.453+0000
3,Bruno Hebert,8566 Nisi Avenue,Llangollen,CE2 4WW,United Kingdom,02794 010514,Donec.non@dapibusrutrum.com,5132188470727440,2021-04-28T17:56:10.453+0000
4,Ira Lucas,936-3011 Convallis Road,Shreveport,67365,United States,1-117-676-2784,nec@lectus.net,5164946381862809,2021-04-28T17:56:10.453+0000
5,Hannah Ferrell,"P.O. Box 755, 7941 Aenean St.",Watson Lake,C6Y 7M3,Canada,1 (867) 533-2852,nec@orciluctuset.co.uk,5256394502723692,2021-04-28T17:56:10.453+0000
6,May Bentley,"P.O. Box 507, 7752 A Street",Chilliwack,E5R 9R0,Canada,1 (581) 785-4024,egestas@Sedeueros.edu,5553254253392933,2021-04-28T17:56:10.453+0000
7,Tariq Singh,"P.O. Box 716, 3443 Metus. Ave",Silchar,968978,India,+91 6777750639,luctus.ut@euismod.com,5103138625325504,2021-04-28T17:56:10.453+0000
8,Serina Serrano,"Ap #203-1558 Ut, Av.",Auldearn,K52 0FY,United Kingdom,08200 538605,Phasellus.ornare@antedictum.co.uk,5154112146956780,2021-04-28T17:56:10.453+0000
9,Ishrat Raj,"P.O. Box 248, 7775 Commodo Rd.",Aizwal,630461,India,+91 7810721323,consectetuer@egestasSed.ca,5341883080575026,2021-04-28T17:56:10.453+0000
10,Nelle Frost,"P.O. Box 904, 8637 Duis Av.",Watson Lake,K9N 0V4,Canada,1 (905) 664-4248,sapien.imperdiet@Donecvitaeerat.org,5576329371730488,2021-04-28T17:56:10.453+0000


customer_id,customer_name,city,postalcode,country,email,updated_at,phone,credit_card,address
1,Ariel Hale,College,98362,United States,amet.metus@Nullatinciduntneque.net,2021-04-28T17:56:10.453+0000,b5dd92eff3d588b487af68a21e83c757ef67f78beb722abdce9de250a141710f,7bece2f3ef6f41d2edc99dea952ce5fb214ea876f2d6de4d552a1b542a84cb74,bad715752fbd902604b6ac5a8bc6c08aaf1cb2bc885b679c2dcbc0b62e3ab461
2,Aubrey Norris,Coldstream,D10 5JV,United Kingdom,sollicitudin@enimmitempor.ca,2021-04-28T17:56:10.453+0000,fd4e9edb642956e26fe6974476cc370cd6db986959a1f04e3ec02b9c1e42673c,7349084c9cf354873646ef6b8262add48c50880a5488f9fca2e94e72b775ed33,c7581221e6ecafb90fc6fb3fb9496be53f8c70ce066734a92ddf8bd60aa77887
3,Bruno Hebert,Llangollen,CE2 4WW,United Kingdom,Donec.non@dapibusrutrum.com,2021-04-28T17:56:10.453+0000,0f9f96ffbad2f6792c547b3f4f1c410ed20100e30ff7cf7db8f3ef99aa445fef,b435d63e6fe49c7fdb6eb0af1a5c913cb75ab62822aa32232ee1d3f9c9a0c898,b574810a5bb2d8cbe50d122046c09d3405dc61713d288d5f7b8d71687867fe03
4,Ira Lucas,Shreveport,67365,United States,nec@lectus.net,2021-04-28T17:56:10.453+0000,20222bde4ed443685ffebc02a68a610c2e0862b535d7be167f1e2c28e28bb370,d3ef2e9ae006c9f49b51ae9d67a355393b3478be850a235853500079a6f8c78c,859574d23810fe99412a344dd1bcd97b550c27e026df7aeadf16650a3e37d653
5,Hannah Ferrell,Watson Lake,C6Y 7M3,Canada,nec@orciluctuset.co.uk,2021-04-28T17:56:10.453+0000,73c857e4eb80dde212251499a01bc3d2c930dc741871b4447df5274a4347490d,0940976aa4def8d6a038d642c5abf9fb05b7c092d7cf6a1a829a07e39671b191,65897909eb6c663ef29d355955450f72617c9902883f71ea143c90f081266c11
6,May Bentley,Chilliwack,E5R 9R0,Canada,egestas@Sedeueros.edu,2021-04-28T17:56:10.453+0000,004ef30cb39b1ede61ac5d0ee87b6f89d1f895c1e22167704f42769d642861ce,0c9c69f0e315de01f2074e5d77417be0ef26937447c6755061ca2ad9e11c4500,0500ea20e89af55a6fac6ef0bec4499e6fb414f407af39c7ede5099d985b2d62
7,Tariq Singh,Silchar,968978,India,luctus.ut@euismod.com,2021-04-28T17:56:10.453+0000,fae9b624b2a01a8097e2c6a66b7f37bf56038ff92c2177d48b383db2562fb156,b5159e6929ed1a9b9c8ca5b27bb3138c5170a6beabdf8d285ca77307a4619fa5,6cce8c6637137eca0ac4ecb87ee9c20f99bfeac73320def6c0111c5f96280d29
8,Serina Serrano,Auldearn,K52 0FY,United Kingdom,Phasellus.ornare@antedictum.co.uk,2021-04-28T17:56:10.453+0000,79cb4c7e194377a1020612d553813d860bbe31fef87b29e50df53de1b8f85aa6,fb7f92d3274a63e3cb66929cd59823aa96adb5983d60b007a22a1550978a18dc,e3cdcf9e9dabe3bec9fc75b8aff8f0b5b592b470ea9cef3488ec6e86de32424b
9,Ishrat Raj,Aizwal,630461,India,consectetuer@egestasSed.ca,2021-04-28T17:56:10.453+0000,9ccd97e2e0e3cd3aac4102f270029a4670432cfcb8ab135252d7be25b96ede84,4c024007c29fe893a685e4a96d054944a2d9387e59ad9ffa03a28626a469c87d,688e11f0551d2815676a767ce96b289885fe0d9293e307dd66d8f13eb04e4c11
10,Nelle Frost,Watson Lake,K9N 0V4,Canada,sapien.imperdiet@Donecvitaeerat.org,2021-04-28T17:56:10.453+0000,db693e6d5ab3a2d32d87b89dca3e1bfeb377338551a6fc68dc5d3d6595dd16fe,517bdccaf06ad0146c4b9cd334850e1aeda8818f7f8961f5900f359e0e6bcd03,258ee2462b43b6e4db568d480cc72d20ea3887b7b7798ff09fde540444f50461


order_number,customer_id,product_id,order_date,units,sale_price,currency,order_mode,updated_at
1,212,5,02/03/2019,10,11.6,USD,NEW,2021-04-28T17:56:11.053+0000
2,1940,10,06/24/2020,8,72.31,USD,NEW,2021-04-28T17:56:11.053+0000
3,60,6,02/11/2019,4,24.82,INR,NEW,2021-04-28T17:56:11.053+0000
4,2776,6,05/20/2018,4,20.91,USD,NEW,2021-04-28T17:56:11.053+0000
5,409,9,07/05/2019,5,98.41,INR,NEW,2021-04-28T17:56:11.053+0000
6,978,6,12/16/2020,1,6.9,USD,NEW,2021-04-28T17:56:11.053+0000
7,2904,6,01/04/2021,1,71.56,EURO,NEW,2021-04-28T17:56:11.053+0000
8,1269,3,08/11/2018,6,47.67,USD,NEW,2021-04-28T17:56:11.053+0000
9,2628,5,01/16/2017,1,59.05,EURO,NEW,2021-04-28T17:56:11.053+0000
10,1672,8,08/01/2020,3,43.42,USD,NEW,2021-04-28T17:56:11.053+0000


order_number,customer_id,product_id,order_date,units,sale_price,currency,order_mode,updated_at
1,212,5,02/03/2019,10,11.6,USD,NEW,2021-04-28T17:56:11.053+0000
2,1940,10,06/24/2020,8,72.31,USD,NEW,2021-04-28T17:56:11.053+0000
3,60,6,02/11/2019,4,24.82,INR,NEW,2021-04-28T17:56:11.053+0000
4,2776,6,05/20/2018,4,20.91,USD,NEW,2021-04-28T17:56:11.053+0000
5,409,9,07/05/2019,5,98.41,INR,NEW,2021-04-28T17:56:11.053+0000
6,978,6,12/16/2020,1,6.9,USD,NEW,2021-04-28T17:56:11.053+0000
7,2904,6,01/04/2021,1,71.56,EURO,NEW,2021-04-28T17:56:11.053+0000
8,1269,3,08/11/2018,6,47.67,USD,NEW,2021-04-28T17:56:11.053+0000
9,2628,5,01/16/2017,1,59.05,EURO,NEW,2021-04-28T17:56:11.053+0000
10,1672,8,08/01/2020,3,43.42,USD,NEW,2021-04-28T17:56:11.053+0000
