In [0]:
adlsAccountName = "synapsetiphackathon"
sourceAdlsContainerName = "landingzone"
sinkAdlsContainerName = "stagingzone"
sourceAdlsFolderName = "raw/customer_address"
sinkAdlsFolderName = "rawdata/tiphack/customer_address"

In [0]:
spark.conf.set(
    "fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net",
    dbutils.secrets.get(scope="autoloaderphotonscope",key="Tip-adls2-secret"))

In [0]:
dbutils.fs.ls("abfss://" + sourceAdlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/")

In [0]:
SubscriptionID = dbutils.secrets.get("autoloaderphotonscope","SubscriptionID")
DirectoryID = dbutils.secrets.get("autoloaderphotonscope","DirectoryID")
ServicePrincipalAppID = dbutils.secrets.get("autoloaderphotonscope","ServicePrincipalAppID")
ServicePrincipalSecret = dbutils.secrets.get("autoloaderphotonscope","autoloaderphotonappsecret")
ResourceGroup = dbutils.secrets.get("autoloaderphotonscope","TipResourceGroup")
BlobConnectionKey = dbutils.secrets.get("autoloaderphotonscope","Tip-adls2-secret")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": ServicePrincipalAppID,
           "fs.azure.account.oauth2.client.secret": ServicePrincipalSecret,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+DirectoryID+"/oauth2/token"}

In [0]:
dbutils.fs.unmount("/mnt/source")
dbutils.fs.unmount("/mnt/sink") 

In [0]:
dbutils.fs.mount(
  source = "abfss://"+sourceAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/source",
  extra_configs = configs)


dbutils.fs.mount(
  source = "abfss://"+sinkAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/sink",
  extra_configs = configs)

In [0]:
dbutils.fs.ls ("/mnt/")

In [0]:
# Syntax from Synapse Spark
#source_files = dbutils.fs.ls ("/mnt/source/")
#for file in source_files:
    #print(file.name, file.isDir, file.isFile, file.path, file.size)

##Intialize AutoLoader

In [0]:
from pyspark.sql.types import *
import json

jschema = '{"fields":[{"metadata":{},"name":"ca_address_sk","nullable":false,"type":"integer"},{"metadata":{},"name":"ca_address_id","nullable":false,"type":"string"},{"metadata":{},"name":"ca_street_number","nullable":true,"type":"string"},{"metadata":{},"name":"ca_street_name","nullable":true,"type":"string"},{"metadata":{},"name":"ca_street_type","nullable":true,"type":"string"},{"metadata":{},"name":"ca_suite_number","nullable":true,"type":"string"},{"metadata":{},"name":"ca_city","nullable":true,"type":"string"},{"metadata":{},"name":"ca_county","nullable":true,"type":"string"},{"metadata":{},"name":"ca_state","nullable":true,"type":"string"},{"metadata":{},"name":"ca_zip","nullable":true,"type":"string"},{"metadata":{},"name":"ca_country","nullable":true,"type":"string"},{"metadata":{},"name":"ca_gmt_offset","nullable":true,"type":"decimal(5,2)"},{"metadata":{},"name":"ca_location_type","nullable":true,"type":"string"}],"type":"struct"}'

schema = StructType.fromJson(json.loads(jschema))

In [0]:
cloudfile = {
  "cloudFiles.subscriptionId":SubscriptionID,
  "cloudFiles.format":"csv",
  "cloudFiles.tenantId":DirectoryID,
  "cloudFiles.clientId":ServicePrincipalAppID,
  "cloudFiles.clientSecret":ServicePrincipalSecret,
  "cloudFiles.resourceGroup":ResourceGroup,
  "cloudFiles.useNotifications": "true", 
}

##Build Streaming Dataframe

In [0]:
filePath = "/mnt/source/"+sourceAdlsFolderName+"/"
df = (spark
      .readStream
      .format("cloudFiles")
      .schema(schema)
      .options(**cloudfile)
      .option("delimiter", "|") 
      .option("Header",False)
      .option("cloudFiles.schemaLocation","/mnt/source/"+sourceAdlsFolderName+"/_schema") 
      .load(filePath))

In [0]:
from pyspark.sql.functions import input_file_name, count
filesdf = (df
          .withColumn("file",input_file_name())
          .groupBy("file")
          .agg(count("*"))
          )
display(filesdf)

file,count(1)
/mnt/source/raw/customer_address/customer_address_3_24.dat,41667
/mnt/source/raw/customer_address/customer_address_11_24.dat,41667
/mnt/source/raw/customer_address/customer_address_7_24.dat,41667
/mnt/source/raw/customer_address/customer_address_14_24.dat,41667
/mnt/source/raw/customer_address/customer_address_5_24.dat,41667
/mnt/source/raw/customer_address/customer_address_13_24.dat,41667
/mnt/source/raw/customer_address/customer_address_8_24.dat,41667
/mnt/source/raw/customer_address/customer_address_4_24.dat,41667
/mnt/source/raw/customer_address/customer_address_17_24.dat,41666
/mnt/source/raw/customer_address/customer_address_22_24.dat,41666


In [0]:
from delta.tables import *
def upsertToDelta(microBatchOutputDF, batchId):
  
  deltadf = DeltaTable.forName(spark,"saleslt.address")
  
  (deltadf.alias("t")
  .merge(
    microBatchOutputDF.alias("s"),
    "s.AddressID = t.AddressID")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
  )

In [0]:
streamQuery = (df.writeStream
 .format("delta")
 .outputMode("append")
 #.foreachBatch(upsertToDelta) # Comment this out first time you run
 .queryName("c-changeLoader-merge-customer-address") # Comment this out first time you run
 #.trigger(once=True)
 .option("checkpointLocation", "/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/_checkpoint")
 .start("/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/")
)

In [0]:
dfsql = spark.read.format("delta").load("/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/")
dfsql.show()

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS tip")
spark.sql("CREATE TABLE IF NOT EXISTS tip.customer_address USING DELTA LOCATION '/mnt/sink/"+sinkAdlsFolderName+"/AutoLoader/data/'")

In [0]:
%sql

SELECT * FROM tip.customer_address where ca_zip = 79089

ca_address_sk,ca_address_id,ca_street_number,ca_street_name,ca_street_type,ca_suite_number,ca_city,ca_county,ca_state,ca_zip,ca_country,ca_gmt_offset,ca_location_type
375004,AAAAAAAAMNILFAAA,648,Chestnut Walnut,Blvd,Suite 100,Prosperity,El Paso County,TX,79089,United States,-6.0,single family
382898,AAAAAAAACLHNFAAA,15,Sixth Church,RD,Suite 80,Prosperity,Jackson County,AR,79089,United States,-6.0,single family
384961,AAAAAAAABMPNFAAA,388,5th,Drive,Suite I,Prosperity,Zapata County,TX,79089,United States,-6.0,single family


In [0]:
%sql

SELECT count(*) FROM tip.customer_address

count(1)
1000000
