In [None]:
adlsAccountName = "<EnterStorageAccountNameHere>"
sourceAdlsContainerName = "bronze"
sinkAdlsContainerName = "silver"
sourceAdlsFolderName = "CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address"
sinkAdlsFolderName = "CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address"

In [None]:
spark.conf.set(
    "fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net",
    dbutils.secrets.get(scope="<EnterDatabrickSecretScopeHere>",key="Adls2-KeySecret"))

In [None]:
dbutils.fs.ls("abfss://" + sourceAdlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/")

In [None]:
SubscriptionID = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","SubscriptionID")
DirectoryID = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","DirectoryID")
ServicePrincipalAppID = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","ServicePrincipalAppID")
ServicePrincipalSecret = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","AppSecret")
ResourceGroup = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","ResourceGroup")
BlobConnectionKey = dbutils.secrets.get("<EnterDatabrickSecretScopeHere>","Adls2-KeySecret")

In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": ServicePrincipalAppID,
           "fs.azure.account.oauth2.client.secret": ServicePrincipalSecret,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+DirectoryID+"/oauth2/token"}

In [None]:
dbutils.fs.unmount("/mnt/source")
dbutils.fs.unmount("/mnt/sink") 

In [None]:
dbutils.fs.mount(
  source = "abfss://"+sourceAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/source",
  extra_configs = configs)


dbutils.fs.mount(
  source = "abfss://"+sinkAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/sink",
  extra_configs = configs)

In [None]:
dbutils.fs.ls ("/mnt/")

##Intialize AutoLoader

In [None]:
from pyspark.sql.types import *
import json

jschema = '{"fields":[{"metadata":{},"name":"AddressID","nullable":true,"type":"integer"},{"metadata":{},"name":"AddressLine1","nullable":true,"type":"string"},{"metadata":{},"name":"AddressLine2","nullable":true,"type":"string"},{"metadata":{},"name":"City","nullable":true,"type":"string"},{"metadata":{},"name":"StateProvince","nullable":true,"type":"string"},{"metadata":{},"name":"CountryRegion","nullable":true,"type":"string"},{"metadata":{},"name":"PostalCode","nullable":true,"type":"string"},{"metadata":{},"name":"rowguid","nullable":true,"type":"string"},{"metadata":{},"name":"ModifiedDate","nullable":true,"type":"string"}],"type":"struct"}'

schema = StructType.fromJson(json.loads(jschema))

In [None]:
cloudfile = {
  "cloudFiles.subscriptionId":SubscriptionID,
  "cloudFiles.format":"csv",
  "cloudFiles.tenantId":DirectoryID,
  "cloudFiles.clientId":ServicePrincipalAppID,
  "cloudFiles.clientSecret":ServicePrincipalSecret,
  "cloudFiles.resourceGroup":ResourceGroup,
  "cloudFiles.useNotifications": "true", 
}

##Build Streaming Dataframe

In [None]:
filePath = "/mnt/source/"+sourceAdlsFolderName+"/"
df = (spark
      .readStream
      .format("cloudFiles")
      .schema(schema)
      .options(**cloudfile)
      .option("Header",True)
      .option("cloudFiles.schemaLocation","/mnt/source/"+sourceAdlsFolderName+"/_schema") 
      .load(filePath))

In [None]:
from pyspark.sql.functions import input_file_name, count
filesdf = (df
          .withColumn("file",input_file_name())
          .groupBy("file")
          .agg(count("*"))
          )
display(filesdf)

file,count(1)
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:48:44.487.csv,450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:48:53.08.csv,451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T01:30:41.2.csv,450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:36:43.23.csv,451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:42:08.65.csv,451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-29T00:39:23.16.csv,451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:00:32.207.csv,450


In [None]:
from delta.tables import *
def upsertToDelta(microBatchOutputDF, batchId):
  
  deltadf = DeltaTable.forName(spark,"saleslt.address")
  
  (deltadf.alias("t")
  .merge(
    microBatchOutputDF.alias("s"),
    "s.AddressID = t.AddressID")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
  )

In [None]:
streamQuery = (df.writeStream
 .format("delta")
 .outputMode("append")
 .foreachBatch(upsertToDelta) # Comment this out first time you run
 .queryName("c-changeLoader-merge") # Comment this out first time you run
 #.trigger(once=True)
 .option("checkpointLocation", "/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/_checkpoint")
 .start("/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/")
)

In [None]:
#dfsql = spark.read.format("delta").load("/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/")
#dfsql.show()

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS saleslt")
spark.sql("CREATE TABLE IF NOT EXISTS saleslt.address USING DELTA LOCATION '/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/'")

In [None]:
%sql

SELECT * FROM saleslt.address where City = 'Everett'

AddressID,AddressLine1,AddressLine2,City,StateProvince,CountryRegion,PostalCode,rowguid,ModifiedDate
867,48995 Evergreen Wy.,,Everett,Washington,United States,98201,6cd4e374-0ef8-4cee-ac60-9dbf1f4e0007,2007-09-01 00:00:00.0000000
871,2502 Evergreen Ste E,,Everett,Washington,United States,98201,3ae9003f-f806-41ad-98a5-3c4a86514bc3,2005-07-01 00:00:00.0000000
897,705 SE Mall Parkway,,Everett,Washington,United States,98201,77a34652-5d9b-40b6-a15a-56d7ee4056f4,2006-08-01 00:00:00.0000000
11385,13833 55th Drive SE,,Everett,WA,USA,98208,29805efe-60fc-4df9-a299-8c1bff87191a,2021-07-28 20:29:02.0000000
