#### A Paramter code block so that you can feed in a different value from ADF pipeline

In [0]:
dbutils.widgets.removeAll()


In [0]:
dbutils.widgets.text("FileName", "contacts_20221022_0938_29_Contacts.csv") #'feed parameter value in from ADF or synapse pipeline'

In [0]:
dataFileName = dbutils.widgets.get("FileName")

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime as dt
from delta import configure_spark_with_delta_pip
import delta
from delta.tables import *

#### Make a database called "deltathis" 
if using Synapse it will appear under your Lake database under Data/Workspace (as long as your cluster is running)

In [0]:
db_name = "deltaondbricks"

if db_name not in [db.name for db in spark.catalog.listDatabases()]:
    spark.sql(f"CREATE DATABASE {db_name}")
else:
    print(f"Database '{db_name}' already exists.")

Database 'deltaondbricks' already exists.


#### Make a table on the database
It has a well defined structure with data types

In [0]:
if spark.catalog.tableExists('{0}.contacts'.format(db_name)):
    print("Table exists. Proceeding with the code.")
else:
    # Create the live contacts table
    # Define the schema
    print("making the contacts table")
    schema = StructType([
        StructField("ConstituentID", StringType()),
        StructField("FirstName", StringType()),
        StructField("KeyName", StringType()),
        StructField("MiddleName", StringType()),
        StructField("MaidenName", StringType()),
        StructField("Nickname", StringType()),
        StructField("Birthdate", DateType()),
        StructField("Gender", StringType()),
        StructField("LookupID", StringType()),
        StructField("Title1", StringType()),
        StructField("Title2", StringType()),
        StructField("Suffix1", StringType()),
        StructField("Suffix2", StringType()),
        StructField("DeceasedDate", DateType()),
        StructField("IsDeceased", StringType()),
        StructField("Status", StringType()),
        StructField("InactiveReason", StringType()),
        StructField("GivesAnonymously", StringType()),
        StructField("SpouseID", StringType()),
        StructField("Area", StringType()),
        StructField("AreaManager", StringType()),
        StructField("Patch", StringType()),
        StructField("PatchManager", StringType()),
        StructField("DateAdded", TimestampType()),
        StructField("DateChanged", TimestampType()),
        StructField("PrimaryEmail", StringType()),
        StructField("PrimaryPostcode", StringType()),
        StructField("Name", StringType()),
        StructField("Delete", StringType()),
        StructField("YYYY", IntegerType()),
        StructField("MM", IntegerType()),
        StructField("DD", IntegerType()),
        StructField("HH", IntegerType())
    ])

    # Create an empty DataFrame with the schema
    df = spark.createDataFrame([], schema)

    # Write the DataFrame to a Delta Lake table
    df.write.partitionBy("YYYY", "MM").format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("{0}.contacts".format(db_name))



Table exists. Proceeding with the code.


In [0]:
#df = spark.read.load('abfss://deltademo@stguksclintonpmsft2.dfs.core.windows.net/contacts-deltathis/{0}'.format(dataFileName), format='csv', header=True)
df = spark.read.load('/mnt/deltademo/contacts-deltathis/{0}'.format(dataFileName), format='csv', header=True)

dataFileName = dataFileName.split('_',2)
df = df \
    .withColumn("YYYY",lit(dataFileName[1][0:4])) \
    .withColumn("MM",lit(dataFileName[1][4:6])) \
    .withColumn("DD",lit(dataFileName[1][6:8])) \
    .withColumn("HH",lit(dataFileName[2][0:2]))
display(df)

ConstituentID,FirstName,KeyName,MiddleName,MaidenName,Nickname,Birthdate,Gender,LookupID,Title1,Title2,Suffix1,Suffix2,DeceasedDate,IsDeceased,Status,InactiveReason,GivesAnonymously,SpouseID,Area,AreaManager,Patch,PatchManager,DateAdded,DateChanged,PrimaryEmail,PrimaryPostcode,Name,Delete,YYYY,MM,DD,HH
00985d29-24a2-4710-8021-00201b4c518c,Adriel,Wilson,,,,02/10/1978,Female,8-19456944,Dr,,,,,No,Active,,No,,,,,,2022-10-21 09:39:01,2022-10-22 09:39:01,,NR9 4HT,Adriel Wilson,No,2022,10,22,9
1234a4ea-7126-4064-8aa2-00607be1cfdb,Rolands,Gaspar,,,,22/10/1978,Female,8-12987664,Mrs,,,,,No,Active,,No,,,,,,2022-10-21 09:39:01,2022-10-22 09:39:01,,IG3 9TA,Rolands Gaspar,No,2022,10,22,9
00b4a5af-c37b-4fe4-9dbb-005c4a1894f7,Beau,White,,,,22/10/1979,Male,8-18761139,Mr,,,,,No,Active,,No,,Northern Ireland and Wales,,,,2022-10-21 09:39:01,2022-10-22 09:39:01,BeauTheMan@manomano.com,L23 0TE,Beau White,No,2022,10,22,9


#### Land the data first from the source
You should land the data in your delta lake. Call this landing area anything you would like. 
You can call it raw, raw_delta or cleansedstaging. 

An important piece to note about writing to the "path" in the code below is that it physically writes to the Storage account at the same time as writing to the Lake Database which is in memory on the spark compute

In [0]:
## To write as an External Table at the same time you can set the Path. See the code below and the code commented out at the end of the last line
## If you do not specify anything then the table becomes a "Managed Table" inside Databricks on the cluster. 
#pathcleansed = '/mnt/delta-contacts/cleansedstaging_bricks'
nextdaydata = df
nextdaydata.write.format("delta").saveAsTable('{0}.staging_contacts'.format(db_name), mode='overwrite') #,path=pathcleansed)

#### Use the Merge function from Delta Lake
Here you take the landed data in cleansedstaging and you do a merge statement so that your final table (in this demo, the final table for demonstration purposes is cleansed).

The matching condition is on the ConstituentID, this is the primary key associated with every "contact". Its their unique ID.

If the record (row) has a Delete value of Yes, then it will be deleted otherwise if its new, its an insert and if it exists it will be an update

In [0]:
from pyspark.sql.functions import *

dfex = DeltaTable.forName(spark, 'deltaondbricks.contacts')
dfnew = DeltaTable.forName(spark, 'deltaondbricks.staging_contacts')

dfnew = dfnew.toDF()
dfnew = dfnew \
      .withColumn("Birthdate", to_date("Birthdate", "dd/MM/yyyy")) \
      .withColumn("DeceasedDate", to_date("DeceasedDate","dd/MM/yyyy")) \
      .withColumn("DateAdded", to_timestamp("DateAdded", "dd/MM/yyyy HH:mm")) \
      .withColumn("DateChanged", to_timestamp("DateChanged", "dd/MM/yyyy HH:mm"))
dfex.alias('dfex').merge(dfnew.alias('dfnew'), 'dfex.ConstituentID = dfnew.ConstituentID') \
    .whenMatchedDelete(condition="dfnew.Delete = 'Yes'") \
    .whenMatchedUpdate(set={
    "ConstituentID": "dfnew.ConstituentID",
    "FirstName": "dfnew.FirstName",
    "KeyName": "dfnew.KeyName",
    "MiddleName": "dfnew.MiddleName",
    "MaidenName": "dfnew.MaidenName",
    "Nickname": "dfnew.Nickname",
    "Birthdate": "dfnew.Birthdate",
    "Gender": "dfnew.Gender",
    "LookupID": "dfnew.LookupID",
    "Title1": "dfnew.Title1",
    "Title2": "dfnew.Title2",
    "Suffix1": "dfnew.Suffix1",
    "Suffix2": "dfnew.Suffix2",
    "DeceasedDate": "dfnew.DeceasedDate",
    "IsDeceased": "dfnew.IsDeceased",
    "Status": "dfnew.Status",
    "InactiveReason": "dfnew.InactiveReason",
    "GivesAnonymously": "dfnew.GivesAnonymously",
    "SpouseID": "dfnew.SpouseID",
    "Area": "dfnew.Area",
    "AreaManager": "dfnew.AreaManager",
    "Patch": "dfnew.Patch",
    "PatchManager": "dfnew.PatchManager",
    "DateAdded": "dfnew.DateAdded",
    "DateChanged": "dfnew.DateChanged",
    "PrimaryEmail": "dfnew.PrimaryEmail",
    "PrimaryPostcode": "dfnew.PrimaryPostcode",
    "Name": "dfnew.Name",
    "Delete": "dfnew.Delete",
    "YYYY": "dfnew.YYYY",
    "MM": "dfnew.MM",
    "DD": "dfnew.DD",
    "HH": "dfnew.HH"
}) \
    .whenNotMatchedInsert(values={
    "ConstituentID": "dfnew.ConstituentID",
    "FirstName": "dfnew.FirstName",
    "KeyName": "dfnew.KeyName",
    "MiddleName": "dfnew.MiddleName",
    "MaidenName": "dfnew.MaidenName",
    "Nickname": "dfnew.Nickname",
    "Birthdate": "dfnew.Birthdate",
    "Gender": "dfnew.Gender",
    "LookupID": "dfnew.LookupID",
    "Title1": "dfnew.Title1",
    "Title2": "dfnew.Title2",
    "Suffix1": "dfnew.Suffix1",
    "Suffix2": "dfnew.Suffix2",
    "DeceasedDate": "dfnew.DeceasedDate",
    "IsDeceased": "dfnew.IsDeceased",
    "Status": "dfnew.Status",
    "InactiveReason": "dfnew.InactiveReason",
    "GivesAnonymously": "dfnew.GivesAnonymously",
    "SpouseID": "dfnew.SpouseID",
    "Area": "dfnew.Area",
    "AreaManager": "dfnew.AreaManager",
    "Patch": "dfnew.Patch",
    "PatchManager": "dfnew.PatchManager",
    "DateAdded": "dfnew.DateAdded",
    "DateChanged": "dfnew.DateChanged",
    "PrimaryEmail": "dfnew.PrimaryEmail",
    "PrimaryPostcode": "dfnew.PrimaryPostcode",
    "Name": "dfnew.Name",
    "Delete": "dfnew.Delete",
    "YYYY": "dfnew.YYYY",
    "MM": "dfnew.MM",
    "DD": "dfnew.DD",
    "HH": "dfnew.HH"
}) \
    .execute()

In [0]:
%sql
select * from deltaondbricks.contacts -- where FirstName like 'Abraham%'

ConstituentID,FirstName,KeyName,MiddleName,MaidenName,Nickname,Birthdate,Gender,LookupID,Title1,Title2,Suffix1,Suffix2,DeceasedDate,IsDeceased,Status,InactiveReason,GivesAnonymously,SpouseID,Area,AreaManager,Patch,PatchManager,DateAdded,DateChanged,PrimaryEmail,PrimaryPostcode,Name,Delete,YYYY,MM,DD,HH
007dcfaf-c408-4542-87ab-00b1100b9372,Jacob,Adams,,,,,Male,8-20785661,Mr,,,,,No,Active,,No,,,,,,,,4051ttt966@sky.com,L31 1GB,Jacob Adams,No,2022,10,21,9
007bc6fb-1034-475e-9cd0-0055ccd99966,Aaron,McKenna,,,,1981-03-22,Female,8-18864825,Mrs,,,,,No,Active,,No,,,,,,2022-10-20T09:38:00Z,2022-10-20T09:38:00Z,,NN4 5DQ,Aaron McKenna,No,2022,10,20,9
009ff12f-2d8a-415e-a79e-00be92248066,Adonis,Jenkins,,,,1981-05-04,Female,8-20835820,Ms,,,,,No,Active,,No,,,,,,2022-10-20T09:38:00Z,2022-10-20T09:38:00Z,,N11 2PN,Adonis Jenkins,No,2022,10,20,9
011acbe9-b5d0-491b-b5be-00c3ea89b111,Jade,Smith,John,,,1981-05-06,Male,8-12632758,Mr,,,,,No,Active,,No,,,,,,2022-10-20T09:38:00Z,2022-10-20T09:38:00Z,DDC3214026@gmail.com,GU10 4DY,Jade Smith,No,2022,10,20,9
00985d29-24a2-4710-8021-00201b4c518c,Adriel,Wilson,,,,1978-10-02,Female,8-19456944,Dr,,,,,No,Active,,No,,,,,,,,,NR9 4HT,Adriel Wilson,No,2022,10,22,9
1234a4ea-7126-4064-8aa2-00607be1cfdb,Rolands,Gaspar,,,,1978-10-22,Female,8-12987664,Mrs,,,,,No,Active,,No,,,,,,,,,IG3 9TA,Rolands Gaspar,No,2022,10,22,9
00b4a5af-c37b-4fe4-9dbb-005c4a1894f7,Beau,White,,,,1979-10-22,Male,8-18761139,Mr,,,,,No,Active,,No,,Northern Ireland and Wales,,,,,,BeauTheMan@manomano.com,L23 0TE,Beau White,No,2022,10,22,9


####  Clean up if you want
Use this code if you want to delete everything

In [0]:
%sql
-- DROP TABLE deltaondbricks.staging_contacts;
-- DROP TABLE deltaondbricks.contacts;
-- DROP DATABASE IF EXISTS deltaondbricks CASCADE;