In [0]:
import pyspark

## Steps:

1. Create Azure Synapse Workspace with ADLS Gen2 Storage Account.

2. Connect the Azure Synapse Workspace ADLS Gen2 Storage Account to an App.

3. Create a Dedicated SQL Pool under the created Workspace.

4. Copy the JDBC SQL Authentication URL.

5. Set up ADLS Gen2 app Connection and Azure Synapse Analytics Connection in Notebook.

6. Mount the Storage Folder & read data to a DataFrame.

7. Establish the Blob Storage connection (without mount point); necessary for `tempDir`.

8. Read Write data to Dedicated SQL Pool.

#### Establish the connection to ADLS Gen2 and Azure Synapse Analytics

In [0]:
# App (configured to ADLS Gen2 App) Details
client_id = "8f9d8191-8fed-4f9b-bf20-55001d293e1b"
tenant_id = "a1c10733-000c-4ddb-98dc-a5d21826ed98"
client_secret = "dq58Q~RXxOqDkbjvHOi-0-hkrSEQd~AaW7QlIbNS"
    
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/{}/oauth2/token".format(tenant_id))
    
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", client_id)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", client_secret)

#### Mount the ADLS Gen2 Storage Account & read data to DataFrame

In [0]:
# connect and mount the storage folder of ADLS Gen2 Storage Account
mount_point = "/mnt/Gen2"

try:
    dbutils.fs.mount(
        source = storage_endpoint,
        mount_point = mount_point,
        extra_configs = configs
    )
except Exception as e:
    print("Error: \n",e)
    
# display file path
display(dbutils.fs.ls("/mnt/Gen2/CustomerFiles/"))

path,name,size,modificationTime
dbfs:/mnt/Gen2/CustomerFiles/part-00000-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-108-1-c000.csv,part-00000-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-108-1-c000.csv,2423786,1672578617000
dbfs:/mnt/Gen2/CustomerFiles/part-00001-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-109-1-c000.csv,part-00001-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-109-1-c000.csv,2423268,1672578616000
dbfs:/mnt/Gen2/CustomerFiles/part-00002-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-110-1-c000.csv,part-00002-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-110-1-c000.csv,2427801,1672578618000
dbfs:/mnt/Gen2/CustomerFiles/part-00003-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-111-1-c000.csv,part-00003-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-111-1-c000.csv,2419248,1672578617000
dbfs:/mnt/Gen2/CustomerFiles/part-00004-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-112-1-c000.csv,part-00004-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-112-1-c000.csv,2425732,1672578617000
dbfs:/mnt/Gen2/CustomerFiles/part-00005-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-113-1-c000.csv,part-00005-tid-3200334632332214470-9b4dec79-7e2e-495d-8657-3b5457ed3753-113-1-c000.csv,2419015,1672578616000


In [0]:
# Read the files from ADLS Gen2 Container
df1 = spark.read.format("csv").option("header", True).option("inferSchema", True).load("dbfs:/mnt/Gen2/CustomerFiles/part-0*.csv")

df1.count()

Out[54]: 90000

In [0]:
df1.show(5)

+---------+------------------+--------------------+-----------+---------------+---------+------------+--------------------+
|C_CUSTKEY|            C_NAME|           C_ADDRESS|C_NATIONKEY|        C_PHONE|C_ACCTBAL|C_MKTSEGMENT|           C_COMMENT|
+---------+------------------+--------------------+-----------+---------------+---------+------------+--------------------+
|    35165|Customer#000035165|    eNQSvDTld1 f7JmY|          0|10-173-541-5438|  4767.46|  AUTOMOBILE|special excuses. ...|
|    30597|Customer#000030597|          S9s1dDut8Q|          0|10-607-243-5581|  -639.62|   FURNITURE|lithely ruthless ...|
|    42279|Customer#000042279|    ABcVdNnA3JFB7bK5|          0|10-934-981-2863|  2236.39|   MACHINERY|the even deposits...|
|    42578|Customer#000042578|l6VNaE7iSZFtkSC5f...|          0|10-281-998-8028|   6429.8|    BUILDING|y alongside of th...|
|    37854|Customer#000037854|  dL6LCTLpY9hjLTrZ7g|          0|10-909-820-4270|  9549.78|    BUILDING|inder blithely de...|
+-------

#### Connect to Blob Storage

In [0]:
# BLOB STORAGE IS NEEDED FOR tempDir
# blob storage details
blob_storage_account = "mycookbookstorage1" + ".blob.core.windows.net"
blob_container = "synapse"
blob_access_key = "otWKH8S6+7QXkXknFMu3HH1I+ksbBOYjHvphcPBo7Wc7mqm1iVCF/TDKSlHsmXa6mBVHomf3UfC0+AStMld7sg=="

# configure blob storage connection
acntInfo = "fs.azure.account.key.{}".format(blob_storage_account)
spark.conf.set(
    acntInfo,
    blob_access_key
)

#### Read Write Operations

In [0]:
# READ DATA FROM DEDICATED SQL POOL
df = spark.read \
   .format("com.databricks.spark.sqldw") \
   .option("url", "jdbc:sqlserver://mycookbookworkspace.sql.azuresynapse.net:1433;database=cookbookDW;user=azureuser@mycookbookworkspace;password=cookbook@0305;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;") \
   .option("tempDir", "wasbs://{}@{}/tempDirs".format(blob_container, blob_storage_account)) \
   .option("forwardSparkAzureStorageCredentials", "true") \
   .option("dbTable", "CUSTOMER") \
   .load()

In [0]:
df.count()

Out[16]: 90000

In [0]:
df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3008728943279613>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m     50[0m                   

In [0]:
# Write Data To a new Table
df1.write\
    .format("com.databricks.spark.sqldw")\
    .option("url", "jdbc:sqlserver://mycookbookworkspace.sql.azuresynapse.net:1433;database=cookbookDW;user=azureuser@mycookbookworkspace;password=cookbook@0305;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;")\
    .option("forwardSparkAzureStorageCredentials", "true")\
    .option("dbTable", "new_customers")\
    .option("tempDir", "wasbs://{}@{}/tempDirs".format(blob_container, blob_storage_account))\
    .save()

In [0]:
# Write Data To an Existing Table
df1.write\
    .format("com.databricks.spark.sqldw")\
    .option("url", "jdbc:sqlserver://mycookbookworkspace.sql.azuresynapse.net:1433;database=cookbookDW;user=azureuser@mycookbookworkspace;password=cookbook@0305;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;")\
    .option("forwardSparkAzureStorageCredentials", "true")\
    .option("dbTable", "CUSTOMER")\
    .option("tempDir", "wasbs://{}@{}/tempDirs".format(blob_container, blob_storage_account))\
    .mode("overwrite")\
    .save()

In [0]:
# Unmount ADLS Gen2 Storage Account Folder
dbutils.fs.unmount("/mnt/Gen2")

/mnt/Gen2 has been unmounted.
Out[63]: True

In [0]:
# Query data from dedicated SQL Pool

df_query = spark.read \
   .format("com.databricks.spark.sqldw") \
   .option("url", "jdbc:sqlserver://mycookbookworkspace.sql.azuresynapse.net:1433;database=cookbookDW;user=azureuser@mycookbookworkspace;password=cookbook@0305;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;") \
   .option("tempDir", "wasbs://{}@{}/tempDirs".format(blob_container, blob_storage_account)) \
   .option("forwardSparkAzureStorageCredentials", "true") \
   .option("query", "select C_MKTSEGMENT, count(*) as Cnt from [dbo].[customer] group by C_MKTSEGMENT") \
   .load()

In [0]:
df_query.count()

Out[21]: 5

In [0]:
df_query.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3008728943279608>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf_query[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m     50[0m             

In [0]:
display(df_query.limit(5))

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-3008728943279612>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mdf_query[0m[0;34m.[0m[0mlimit[0m[0;34m([0m[0;36m5[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/display.py[0m in [0;36mdisplay[0;34m(self, input, *args, **kwargs)[0m
[1;32m     81[0m                     [0;32mraise[0m [0mException[0m[0;34m([0m[0;34m'Triggers can only be set for streaming queries.'[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     82[0m [0;34m[0m[0m
[0;32m---> 83[0;31m                 [0mself[0m[0;34m.[0m[0madd_custom_display_data[0m[0;34m([0m[0;34m"table"[0m[0;34m,[0m [0minput[0m[0;34m.[0m[0m_jdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     84[0m [0;34m[0