**ConnectToDeltaOnADLS**

Connect to the Delta Table created in ADF using Azure Databricks to connect to ADLS with a Service Principal

Resouces Links:

https://learn.microsoft.com/en-us/azure/databricks/getting-started/connect-to-azure-storage

https://docs.databricks.com/storage/azure-storage.html

In [0]:
adlsAccountName = "<enterADLSaccountNameHere>"
sourceAdlsContainerName = "bronze"
sinkAdlsContainerName = "silver"
sourceAdlsFolderName = "diabetes"
sinkAdlsFolderName = "diabetes"

In [0]:
spark.conf.set(
    "fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net",
    dbutils.secrets.get(scope="<enterScopeHere>",key="Adls2-KeySecret"))

In [0]:
dbutils.fs.ls("abfss://" + sourceAdlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/")

Out[7]: [FileInfo(path='abfss://bronze@cdcacceleredfkdd3zynq6k.dfs.core.windows.net/SalesLT.Customer.txt', name='SalesLT.Customer.txt', size=217829, modificationTime=1679577529000),
 FileInfo(path='abfss://bronze@cdcacceleredfkdd3zynq6k.dfs.core.windows.net/SalesLTAddress.csv', name='SalesLTAddress.csv', size=62625, modificationTime=1679577890000),
 FileInfo(path='abfss://bronze@cdcacceleredfkdd3zynq6k.dfs.core.windows.net/SalesLTProduct.csv', name='SalesLTProduct.csv', size=1358154, modificationTime=1679577891000),
 FileInfo(path='abfss://bronze@cdcacceleredfkdd3zynq6k.dfs.core.windows.net/diabetes/', name='diabetes/', size=0, modificationTime=1679700073000)]

In [0]:
SubscriptionID = dbutils.secrets.get("<enterScopeHere>","SubscriptionID")
DirectoryID = dbutils.secrets.get("<enterScopeHere>","DirectoryID")
ServicePrincipalAppID = dbutils.secrets.get("<enterScopeHere>","ServicePrincipalAppID")
ServicePrincipalSecret = dbutils.secrets.get("<enterScopeHere>","AppSecret")
ResourceGroup = dbutils.secrets.get("<enterScopeHere>","ResourceGroup")
BlobConnectionKey = dbutils.secrets.get("<enterScopeHere>","Adls2-KeySecret")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": ServicePrincipalAppID,
           "fs.azure.account.oauth2.client.secret": ServicePrincipalSecret,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"+DirectoryID+"/oauth2/token"}

In [0]:
dbutils.fs.unmount("/mnt/source")
dbutils.fs.unmount("/mnt/sink") 

/mnt/source has been unmounted.
/mnt/sink has been unmounted.
Out[10]: True

In [0]:
dbutils.fs.mount(
  source = "abfss://"+sourceAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/source",
  extra_configs = configs)


dbutils.fs.mount(
  source = "abfss://"+sinkAdlsContainerName+"@"+adlsAccountName+".dfs.core.windows.net/",
  mount_point = "/mnt/sink",
  extra_configs = configs)

Out[11]: True

In [0]:
dbutils.fs.ls ("/mnt/")

Out[12]: [FileInfo(path='dbfs:/mnt/sink/', name='sink/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/source/', name='source/', size=0, modificationTime=0)]

In [0]:
dfd = spark.read.csv('/mnt/source/diabetes/diabetes.csv', header=True)
dfd.show()

+---+---+----+---+---+-----+---+----+------+---+---+
|AGE|SEX| BMI| BP| S1|   S2| S3|  S4|    S5| S6|  Y|
+---+---+----+---+---+-----+---+----+------+---+---+
| 59|  2|32.1|101|157| 93.2| 38|   4|4.8598| 87|151|
| 48|  1|21.6| 87|183|103.2| 70|   3|3.8918| 69| 75|
| 72|  2|30.5| 93|156| 93.6| 41|   4|4.6728| 85|141|
| 24|  1|25.3| 84|198|131.4| 40|   5|4.8903| 89|206|
| 50|  1|  23|101|192|125.4| 52|   4|4.2905| 80|135|
| 23|  1|22.6| 89|139| 64.8| 61|   2|4.1897| 68| 97|
| 36|  2|  22| 90|160| 99.6| 50|   3|3.9512| 82|138|
| 66|  2|26.2|114|255|  185| 56|4.55|4.2485| 92| 63|
| 60|  2|32.1| 83|179|119.4| 42|   4|4.4773| 94|110|
| 29|  1|  30| 85|180| 93.4| 43|   4|5.3845| 88|310|
| 22|  1|18.6| 97|114| 57.6| 46|   2|3.9512| 83|101|
| 56|  2|  28| 85|184|144.8| 32|   6|3.5835| 77| 69|
| 53|  1|23.7| 92|186|109.2| 62|   3|4.3041| 81|179|
| 50|  2|26.2| 97|186|105.4| 49|   4|5.0626| 88|185|
| 61|  1|  24| 91|202|115.4| 72|   3|4.2905| 73|118|
| 34|  2|24.7|118|254|184.2| 39|   7| 5.037| 8

In [0]:
from pyspark.sql import functions as F
dfd2 = dfd.withColumn("Gender", F.when(F.col("SEX") == 1, "Male").otherwise("Female"))
dfd2.show()

+---+---+----+---+---+-----+---+----+------+---+---+------+
|AGE|SEX| BMI| BP| S1|   S2| S3|  S4|    S5| S6|  Y|Gender|
+---+---+----+---+---+-----+---+----+------+---+---+------+
| 59|  2|32.1|101|157| 93.2| 38|   4|4.8598| 87|151|Female|
| 48|  1|21.6| 87|183|103.2| 70|   3|3.8918| 69| 75|  Male|
| 72|  2|30.5| 93|156| 93.6| 41|   4|4.6728| 85|141|Female|
| 24|  1|25.3| 84|198|131.4| 40|   5|4.8903| 89|206|  Male|
| 50|  1|  23|101|192|125.4| 52|   4|4.2905| 80|135|  Male|
| 23|  1|22.6| 89|139| 64.8| 61|   2|4.1897| 68| 97|  Male|
| 36|  2|  22| 90|160| 99.6| 50|   3|3.9512| 82|138|Female|
| 66|  2|26.2|114|255|  185| 56|4.55|4.2485| 92| 63|Female|
| 60|  2|32.1| 83|179|119.4| 42|   4|4.4773| 94|110|Female|
| 29|  1|  30| 85|180| 93.4| 43|   4|5.3845| 88|310|  Male|
| 22|  1|18.6| 97|114| 57.6| 46|   2|3.9512| 83|101|  Male|
| 56|  2|  28| 85|184|144.8| 32|   6|3.5835| 77| 69|Female|
| 53|  1|23.7| 92|186|109.2| 62|   3|4.3041| 81|179|  Male|
| 50|  2|26.2| 97|186|105.4| 49|   4|5.0

In [0]:
dfa = spark.read.csv('/mnt/source/diabetes/ageband.csv', header=True)
dfa.show()

+---+------------+
|age|     ageband|
+---+------------+
|  1|        Baby|
|  2|        Baby|
|  3|     Toddler|
|  4|     Toddler|
|  5|  Elementary|
|  6|  Elementary|
|  7|  Elementary|
|  8|  Elementary|
|  9|  Elementary|
| 10|  Elementary|
| 11|MiddleSchool|
| 12|MiddleSchool|
| 13|MiddleSchool|
| 14|  HighSchool|
| 15|  HighSchool|
| 16|  HighSchool|
| 17|  HighSchool|
| 18|  YoungAdult|
| 19|  YoungAdult|
| 20|  YoungAdult|
+---+------------+
only showing top 20 rows



In [0]:
dfa= dfa.withColumnRenamed('age', 'age2')
dfa.show()

+----+------------+
|age2|     ageband|
+----+------------+
|   1|        Baby|
|   2|        Baby|
|   3|     Toddler|
|   4|     Toddler|
|   5|  Elementary|
|   6|  Elementary|
|   7|  Elementary|
|   8|  Elementary|
|   9|  Elementary|
|  10|  Elementary|
|  11|MiddleSchool|
|  12|MiddleSchool|
|  13|MiddleSchool|
|  14|  HighSchool|
|  15|  HighSchool|
|  16|  HighSchool|
|  17|  HighSchool|
|  18|  YoungAdult|
|  19|  YoungAdult|
|  20|  YoungAdult|
+----+------------+
only showing top 20 rows



In [0]:
dffin = dfd2.join(dfa,dfd2.AGE ==  dfa.age2,"inner")
dffin.show()

+---+---+----+---+---+-----+---+----+------+---+---+------+----+------------+
|AGE|SEX| BMI| BP| S1|   S2| S3|  S4|    S5| S6|  Y|Gender|age2|     ageband|
+---+---+----+---+---+-----+---+----+------+---+---+------+----+------------+
| 59|  2|32.1|101|157| 93.2| 38|   4|4.8598| 87|151|Female|  59|RetiredAdult|
| 48|  1|21.6| 87|183|103.2| 70|   3|3.8918| 69| 75|  Male|  48|       Adult|
| 72|  2|30.5| 93|156| 93.6| 41|   4|4.6728| 85|141|Female|  72|RetiredAdult|
| 24|  1|25.3| 84|198|131.4| 40|   5|4.8903| 89|206|  Male|  24|       Adult|
| 50|  1|  23|101|192|125.4| 52|   4|4.2905| 80|135|  Male|  50|       Adult|
| 23|  1|22.6| 89|139| 64.8| 61|   2|4.1897| 68| 97|  Male|  23|       Adult|
| 36|  2|  22| 90|160| 99.6| 50|   3|3.9512| 82|138|Female|  36|       Adult|
| 66|  2|26.2|114|255|  185| 56|4.55|4.2485| 92| 63|Female|  66|RetiredAdult|
| 60|  2|32.1| 83|179|119.4| 42|   4|4.4773| 94|110|Female|  60|RetiredAdult|
| 29|  1|  30| 85|180| 93.4| 43|   4|5.3845| 88|310|  Male|  29|

In [0]:
dffin = dffin.drop("age2")
dffin.show()

+---+---+----+---+---+-----+---+----+------+---+---+------+------------+
|AGE|SEX| BMI| BP| S1|   S2| S3|  S4|    S5| S6|  Y|Gender|     ageband|
+---+---+----+---+---+-----+---+----+------+---+---+------+------------+
| 59|  2|32.1|101|157| 93.2| 38|   4|4.8598| 87|151|Female|RetiredAdult|
| 48|  1|21.6| 87|183|103.2| 70|   3|3.8918| 69| 75|  Male|       Adult|
| 72|  2|30.5| 93|156| 93.6| 41|   4|4.6728| 85|141|Female|RetiredAdult|
| 24|  1|25.3| 84|198|131.4| 40|   5|4.8903| 89|206|  Male|       Adult|
| 50|  1|  23|101|192|125.4| 52|   4|4.2905| 80|135|  Male|       Adult|
| 23|  1|22.6| 89|139| 64.8| 61|   2|4.1897| 68| 97|  Male|       Adult|
| 36|  2|  22| 90|160| 99.6| 50|   3|3.9512| 82|138|Female|       Adult|
| 66|  2|26.2|114|255|  185| 56|4.55|4.2485| 92| 63|Female|RetiredAdult|
| 60|  2|32.1| 83|179|119.4| 42|   4|4.4773| 94|110|Female|RetiredAdult|
| 29|  1|  30| 85|180| 93.4| 43|   4|5.3845| 88|310|  Male|       Adult|
| 22|  1|18.6| 97|114| 57.6| 46|   2|3.9512| 83|101

In [0]:
#  Delta
dffin.write.format('delta')\
  .mode('overwrite')\
  .option("overwriteSchema", "true")\
  .save('/mnt/sink/diabetes/adbdelta/')

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS diabetesadb")
spark.sql("CREATE TABLE IF NOT EXISTS diabetesadb.diabetesageadb USING DELTA LOCATION \
'/mnt/sink/diabetes/adbdelta/'")


Out[38]: DataFrame[]

In [0]:

%sql

SELECT * FROM diabetesadb.diabetesageadb
     

AGE,SEX,BMI,BP,S1,S2,S3,S4,S5,S6,Y,Gender,ageband
59,2,32.1,101.0,157,93.2,38.0,4.0,4.8598,87,151,Female,RetiredAdult
48,1,21.6,87.0,183,103.2,70.0,3.0,3.8918,69,75,Male,Adult
72,2,30.5,93.0,156,93.6,41.0,4.0,4.6728,85,141,Female,RetiredAdult
24,1,25.3,84.0,198,131.4,40.0,5.0,4.8903,89,206,Male,Adult
50,1,23.0,101.0,192,125.4,52.0,4.0,4.2905,80,135,Male,Adult
23,1,22.6,89.0,139,64.8,61.0,2.0,4.1897,68,97,Male,Adult
36,2,22.0,90.0,160,99.6,50.0,3.0,3.9512,82,138,Female,Adult
66,2,26.2,114.0,255,185.0,56.0,4.55,4.2485,92,63,Female,RetiredAdult
60,2,32.1,83.0,179,119.4,42.0,4.0,4.4773,94,110,Female,RetiredAdult
29,1,30.0,85.0,180,93.4,43.0,4.0,5.3845,88,310,Male,Adult
