In [17]:
# Reading data using RAW Data Table for API
df_api = spark.sql("SELECT * FROM MF_LH_Assessment_21921.raw_api_data")
display(df_api)

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e7d25d81-db64-467f-ad1b-ad768a38e290)

In [18]:
# Reading data using RAW Data Table for CRM
df_crm = spark.sql("SELECT * FROM MF_LH_Assessment_21921.raw_crm_data")
display(df_crm)

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bb24a657-9df6-4524-9624-d566900b8fa8)

In [19]:
# Reading data using RAW Data Table for ERP
df_erp = spark.sql("SELECT * FROM MF_LH_Assessment_21921.raw_erp_data")
display(df_erp)

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8724345b-133c-4dc9-9204-dedf6c2eaa73)

In [20]:
#Data Cleanup
from pyspark.sql.functions import when, lit, col, current_timestamp, input_file_name

# Add columns CreatedTS and ModifiedTS
df_api = df_api.withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp())
df_crm = df_crm.withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp()) 
df_erp = df_erp.withColumn("CreatedTS", current_timestamp()).withColumn("ModifiedTS", current_timestamp()) 

# Drop records if Latitute or Longitude are blank or null

df_api = df_api.filter(col("latitude").isNotNull() & (col("latitude") != "") & col("longitude").isNotNull() & (col("longitude") != ""))
# display(df_api)

# Drop records if Amount is missing in CRM Data
df_crm = df_crm.filter(col("amount")>0)
# display(df_crm)

# Drop records if Price is missing in the ERP Data
df_erp = df_erp.filter(col("price")>0)
# display(df_erp)

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 22, Finished, Available, Finished)

In [21]:
# Define the schema for the api_silver,crm_silver,erp_silver table

from pyspark.sql.types import *
from delta.tables import *

DeltaTable.createIfNotExists(spark) \
    .tableName("api_silver") \
    .addColumn("Id", StringType()) \
    .addColumn("Name", StringType()) \
    .addColumn("latitude", FloatType()) \
    .addColumn("longitude", FloatType()) \
    .addColumn("CreatedTS", DateType()) \
    .addColumn("ModifiedTS", DateType()) \
    .execute()

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 23, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7de6f1eb0520>

In [22]:
DeltaTable.createIfNotExists(spark) \
    .tableName("crm_silver") \
    .addColumn("customer_id", StringType()) \
    .addColumn("customer_name", StringType()) \
    .addColumn("contact_email", StringType()) \
    .addColumn("phone_number", StringType()) \
    .addColumn("purchase_date",DateType()) \
    .addColumn("amount",FloatType()) \
    .addColumn("CreatedTS", DateType()) \
    .addColumn("ModifiedTS", DateType()) \
    .execute()

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 24, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7de6f1eb18a0>

In [23]:
DeltaTable.createIfNotExists(spark) \
    .tableName("erp_silver") \
    .addColumn("product_id", StringType()) \
    .addColumn("product_name", StringType()) \
    .addColumn("category", StringType()) \
    .addColumn("price", FloatType()) \
    .addColumn("stock_quantity",FloatType()) \
    .addColumn("CreatedTS", DateType()) \
    .addColumn("ModifiedTS", DateType()) \
    .execute()

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 25, Finished, Available, Finished)

<delta.tables.DeltaTable at 0x7de6f1eb0250>

In [24]:
# Update existing records and insert new ones for API Data.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/api_silver')

dfUpdates = df_api

deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.Id = updates.Id and silver.Name = updates.Name and silver.latitude = updates.latitude and silver.longitude = updates.longitude'
  ) \
   .whenMatchedUpdate(set =
    {}
  ) \
 .whenNotMatchedInsert(values =
    {
      "Id": "updates.Id",
      "Name": "updates.Name",
      "latitude": "updates.latitude",
      "longitude": "updates.longitude",
      "CreatedTS": "updates.CreatedTS",
      "ModifiedTS": "updates.ModifiedTS"
    }
  ) \
  .execute()

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 26, Finished, Available, Finished)

In [25]:
# Update existing records and insert new ones for CRM Data.

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, 'Tables/crm_silver')

dfUpdates = df_crm

deltaTable.alias('silver') \
  .merge(
    dfUpdates.alias('updates'),
    'silver.customer_id = updates.customer_id and silver.customer_name = updates.customer_name and silver.phone_number = updates.phone_number and silver.purchase_date = updates.purchase_date and silver.amount = updates.amount'
  ) \
   .whenMatchedUpdate(set =
    {}
  ) \
 .whenNotMatchedInsert(values =
    {
      "customer_id": "updates.customer_id",
      "customer_name": "updates.customer_name",
      "contact_email": "updates.contact_email",
      "phone_number": "updates.phone_number",
      "purchase_date": "updates.purchase_date",
      "CreatedTS": "updates.CreatedTS",
      "ModifiedTS": "updates.ModifiedTS"
    }
  ) \
  .execute()

StatementMeta(, 249b6a9c-73b4-415b-af81-dbe675a7dbc8, 27, Finished, Available, Finished)