## Part 1: The Databricks Synapse Connector

Pre-reqs:
* Storage account (ADLS Gen 2)
* Synapse Dedicate SQL Pool
* Databricks Service Principal
* Secret scope with service principal secret, sql password, and storage account key

Limitations:
* Must connect to a sql pool (on-demand is not supported). Attempts to connect to an on-demand endpoint will result in an error.
* Although the Synapse connector can be used for interactive queries against your DW, it is more suited to ETL as each query execution can extract large amounts of data to blob storage. Parquet is the recommended format in such cases.

Overview:

1. Setup
  * Access secrets
  * Configure variables
  * Test storage account access
2. Write to Synapse
  * Create a dataframe
  * Use Synapse Spark connector to write to synapse
3. Read from Synapse
  * Use Synapse Spark connector to read written data to a new dataframe
4. Stream data to Synapse
  * TODO

In [0]:
from types import SimpleNamespace

secret_scope = 'dbw-syn-lab'
secrets = SimpleNamespace(
  sp_secret = dbutils.secrets.get(secret_scope, 'dbw-syn-lab-sp-secret'),
  sa_secret = dbutils.secrets.get(secret_scope, 'dbw-syn-lab-sa-secret'),
  sql_pw = dbutils.secrets.get(secret_scope, 'dbw-syn-lab-sql-pw')
)

The Azure Synapse connector uses three types of network connections:

* Spark driver to Azure Synapse
* Spark driver and executors to Azure storage account
* Azure Synapse to Azure storage account

```
                                 ┌─────────┐
      ┌─────────────────────────>│ STORAGE │<────────────────────────┐
      │   Storage acc key /      │ ACCOUNT │  Storage acc key /      │
      │   Managed Service ID /   └─────────┘  OAuth 2.0 /            │
      │                               │                              │
      │                               │                              │
      │                               │ Storage acc key /            │
      │                               │ OAuth 2.0 /                  │
      │                               │                              │
      v                               v                       ┌──────v────┐
┌──────────┐                      ┌──────────┐                │┌──────────┴┐
│ Synapse  │                      │  Spark   │                ││ Spark     │
│ Analytics│<────────────────────>│  Driver  │<───────────────>│ Executors │
└──────────┘  JDBC with           └──────────┘    Configured   └───────────┘
              username & password /                in Spark
```

It should be noted that use of Blob storage can only used the Storage Account Key, whereas ADLS Gen 2 can optionally use OAuth 2.0 instead.

In [0]:
# Application ID corresponds the App Registration / Service Principal used by Databricks
app_id = '4b309858-a987-4d5a-9a11-a84116790317'

# Directory ID is the tenant this databricks workspace belongs to
directory_id = '6871727a-5747-424a-b9d4-39a621930267'

# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", app_id)
spark.conf.set("fs.azure.account.oauth2.client.secret", secrets.sp_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", app_id)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", secrets.sp_secret)

# Setup the storage account key
storage_account_name = 'strdbwsynworkshop'
container_name = 'synstorage'
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", secrets.sa_secret)

In [0]:
df = spark.read.csv(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/test.csv", header='true')
display(df)

a,b,c
1,2,3
4,5,6
7,8,9


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

schema = StructType([
  StructField("id", IntegerType()),
  StructField("a", IntegerType()),
  StructField("b", StringType()),
  StructField("c", FloatType()),
])

test_df = spark.createDataFrame([[0, 1, "2", 3.14], [1, 4, "5", 6.28], [2, 7, "8", 9.42]], schema=schema)
display(test_df)

id,a,b,c
0,1,2,3.14
1,4,5,6.28
2,7,8,9.42


In [0]:
sql_pool_url = 'syn-dbw-syn-workshop.sql.azuresynapse.net'
serverless_url = 'syn-dbw-syn-workshop-ondemand.sql.azuresynapse.net'
user = 'sqladminuser'
db_name = 'dbwsynworkshoppool'
jdbc_connection_string = f'jdbc:sqlserver://{sql_pool_url}:1433;database={db_name};user={user};password={secrets.sql_pw};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;'
db_table = 'BatchData'

(
  test_df.write
  .format("com.databricks.spark.sqldw")
  .option("url", jdbc_connection_string)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", db_table)
  .option("tempDir", f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/")
  .mode("overwrite") # errorifexists, append, overwrite, ignore
  .save()
)

# What just happened?

The Azure Synapse connector for Databricks leverages Azure Storage and PolyBase / `COPY` to transfer large volumes of data efficiently between Databricks and Synapse. The Azure storage container acts as an intermediary to store bulk data when reading from or writing to Azure Synapse. Spark connects to the storage container using one of the built-in connectors (ADLS or Blob). Once the data is stored in Azure Storage, Synapse connects to the storage account for loading and unloading of that temporary data.

## Why is there an Azure Storage container here?

The Azure Storage container is used as a temporary staging area for the data to reside before loading into Synapse. This allows better performance for very large datasets as the Data Warehouse can leverage CTAS or `COPY` statements for bulk insertion.

## `COPY` vs PolyBase

Which of these two methods should I prefer? In general, prefer to use COPY as it provides the most flexibility for high-throughput data ingestion.

* Use lower privileged users to load without needing strict CONTROL permissions on the data warehouse
* Execute a single T-SQL statement without having to create any additional database objects
* Properly parse and load CSV files where delimiters (string, field, row) are escaped within string delimited columns
* Specify a finer permission model without exposing storage account keys using Share Access Signatures (SAS)
* Use a different storage account for the ERRORFILE location (REJECTED_ROW_LOCATION)
* Customize default values for each target column and specify source data fields to load into specific target columns
* Specify a custom row terminator for CSV files
* Leverage SQL Server Date formats for CSV files
* Specify wildcards and multiple files in the storage location path

Selection of a specific write method can be configured by setting the `writeSemantics` Spark configuration:

```
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<copy OR polybase>")
```

For Databricks 7.0 and above, `COPY` is used by default. Also, the `COPY` semantic is only available on Synapse Gen2 instances.

It should be noted that the SQL user permissions differ between the two methods so ensure you've set this up properly in the Synapse DW before switching.

## `COPY` Details



## PolyBase Details

PolyBase is leveraged using a `CREATE TABLE AS SELECT` (CTAS) statement. It's often used for populating staging tables before loading into your final Data Warehouse schema. It's leveraged behind the scenes by the Synapse connector to efficiently populate data in your Data Warehouse.

In general, the process for using PolyBase will register a location in Blob Storage as an external table. It will then use the CTAS statement to populate a relational table with the data in the external table.

# References

PolyBase: https://docs.microsoft.com/en-us/sql/relational-databases/polybase/polybase-guide?view=sql-server-ver15
Spark save modes: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes

In [0]:
df2 = (
  spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", jdbc_connection_string)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", db_table)
  .option("tempDir", f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/").load()
)
display(df2)

id,a,b,c
0,1,2,3.14
1,4,5,6.28
2,7,8,9.42


# Streaming

The Azure Synapse connector offers efficient and scalable Structured Streaming write support for Azure Synapse that provides consistent user experience with batch writes, and uses PolyBase or COPY for large data transfers between an Azure Databricks cluster and Azure Synapse instance. Similar to the batch writes, streaming is designed largely for ETL, thus providing higher latency that **may not be suitable for real-time data processing in some cases**.

## Fault tolerance semantics

By default, Azure Synapse Streaming offers **end-to-end exactly-once** guarantee for writing data into an Azure Synapse table by reliably tracking progress of the query using a combination of checkpoint location in DBFS, checkpoint table in Azure Synapse, and locking mechanism to ensure that streaming can handle any types of failures, retries, and query restarts. Optionally, you can select less restrictive at-least-once semantics for Azure Synapse Streaming by setting `spark.databricks.sqldw.streaming.exactlyOnce.enabled` option to `false`, in which case data duplication could occur in the event of intermittent connection failures to Azure Synapse or unexpected query termination.

In [0]:
%sh rm -rf /dbfs/tmp_checkpoint_location # Delete checkpoint data

In [0]:
# Prepare streaming source; this could be Kafka or a simple rate stream.
streaming_df = (
   spark.readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .option("numPartitions", "16")
  .load()
)

db_table = 'StreamingData'

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
(
  streaming_df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", jdbc_connection_string)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", db_table)
  .option("tempDir", f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()
)

To view the newset streaming in Azure Synapse, you can run the following SQL:

```sql
SELECT TOP (100) [timestamp],[value]
FROM [dbo].[StreamingData]
ORDER BY timestamp DESC
 ```

## Streaming Differences

There are a few difference between the batch write and the streaming write.

1. Source

  The source of the streaming example is a streaming Dataframe. This could be a variety of difference sources such as an Event Hub or a Kafka topic. In our case, we use a simple `rate` stream that simply outputs a timestamp and a count for testing purposes.

2. `writeStream` instead of `write`

  In order to set up a structured stream, we use `writeStream` instead of `write`.

3. `checkpointLocation`

  To support the exactly-once guarantee we leverage a checkpoint location on the Databricks Filesystem (DBFS). This will additionally leverage a checkpoint table in Synapse with a locking mechanism to ensure that the streaming can handle any types of failures, retried and query restarts.