In [0]:
# Project: Extract, transform, and load data by using Azure Databricks and Azure Synapse Analytics

In [0]:
# Credentials
storage_account_name = "adlsberlin"
file_system_name = "file-system-berlin"
application_id = "0b322e2e-f0d1-4673-8b2c-f86e793f9b2b"
tenant_id = "3bedec9c-e963-4f06-b18a-27d9cd270e69"
secret_key = {secret_key}

# Account configuration
spark.conf.set("fs.azure.account.auth.type." + storage_account_name + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storage_account_name + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storage_account_name + ".dfs.core.windows.net", "" + application_id + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storage_account_name + ".dfs.core.windows.net", "" + secret_key + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storage_account_name + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + file_system_name  + "@" + storage_account_name + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

In [0]:
# Ingesting sample data into the Azure Data Lake Storage Gen2 account
%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

--2022-11-23 11:09:40--  https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8476 (8.3K) [text/plain]
Saving to: ‘/tmp/small_radio_json.json’

     0K ........                                              100% 40.9M=0s

2022-11-23 11:09:40 (40.9 MB/s) - ‘/tmp/small_radio_json.json’ saved [8476/8476]



In [0]:
# Copying the file into storage account
dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://file-system-berlin@adlsberlin.dfs.core.windows.net/")

Out[6]: True

In [0]:
radio_df = spark.read.json("abfss://" + file_system_name + "@" + storage_account_name + ".dfs.core.windows.net/small_radio_json.json")
radio_df.show()

+--------------------+---------+---------+------+-------------+----------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
|              artist|     auth|firstName|gender|itemInSession|  lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
+--------------------+---------+---------+------+-------------+----------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
|         El Arrebato|Logged In| Annalyse|     F|            2|Montgomery|234.57914| free|  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
|Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid|       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|   

In [0]:
# Retrieving selected columns
selected_columns = radio_df.select('firstname', 'lastname', 'gender', 'location', 'level', 'song')
selected_columns.show()

+---------+----------+------+--------------------+-----+--------------------+
|firstname|  lastname|gender|            location|level|                song|
+---------+----------+------+--------------------+-----+--------------------+
| Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|Quiero Quererte Q...|
|   Dylann|    Thomas|     M|       Anchorage, AK| paid|        Born To Move|
|     Liam|     Watts|     M|New York-Newark-J...| paid|                DARE|
|     Tess|  Townsend|     F|Nashville-Davidso...| free|                null|
|  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free| Send Me Some Lovin'|
|     Alan|     Morse|     M|Chicago-Napervill...| paid|         Mellow Mood|
|Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|            Linoleum|
|   Elijah|  Williams|     M|Detroit-Warren-De...| paid|The Man Who Sold ...|
|  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|             La Nina|
|     Tess|  Townsend|     F|Nashville-Davidso...| free|       S

In [0]:
# Renaming some columns
renamed_cols = selected_columns.withColumnRenamed('location', 'staging_location') \
                               .withColumnRenamed('level', 'subscription_type')
renamed_cols.show()

+---------+----------+------+--------------------+-----------------+--------------------+
|firstname|  lastname|gender|    staging_location|subscription_type|                song|
+---------+----------+------+--------------------+-----------------+--------------------+
| Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|Quiero Quererte Q...|
|   Dylann|    Thomas|     M|       Anchorage, AK|             paid|        Born To Move|
|     Liam|     Watts|     M|New York-Newark-J...|             paid|                DARE|
|     Tess|  Townsend|     F|Nashville-Davidso...|             free|                null|
|  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free| Send Me Some Lovin'|
|     Alan|     Morse|     M|Chicago-Napervill...|             paid|         Mellow Mood|
|Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|            Linoleum|
|   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|The Man Who Sold ...|
|  Margaux

In [0]:
# Configuring storage account access key in Databricks notebook session
spark.conf.set("fs.azure.account.key.adlsberlin.blob.core.windows.net", {storage_access_key})

In [0]:
# Loading transformed data into Azure Synapse

# Creating a temporary folder to use while moving data between Azure Databricks and Azure Synapse
#tempDir = f"abfss://{file_system_name}@{storage_account_name}.dbfs.core.windows.net/tempDirs"
tempDir = f"wasbs://{file_system_name}@{storage_account_name}.blob.core.windows.net/tempDirs"

# First Configuring Azure Synapse Connection from Databricks
# Azure Synapse Connection Configuration
dwDatabase = 'dedicatedSqlPool'
dwServer = 'synapse-ws-berlin.sql.azuresynapse.net'
dwUser = 'myadmin'
dwPass = {password}
dwJdbcPort =  "1433"
dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
sqlDwUrl = f"jdbc:sqlserver://{dwServer}:{dwJdbcPort};database={dwDatabase};user={dwUser};password={dwPass};${dwJdbcExtraOptions}"

In [0]:
# Loading transformed data into Azure Synapse as a table
spark.conf.set("spark.sql.parquet.writeLegacyFormat","true")
renamed_cols.write.format("com.databricks.spark.sqldw") \
                          .option("url", sqlDwUrl) \
                          .option("dbTable", "song_table") \
                          .option("forwardSparkAzureStorageCredentials","true") \
                          .option("tempDir", tempDir) \
                          .mode("append") \
                          .save()