# 1) Connect to Synapse using the official route

In [None]:
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

val dfToReadFromTable:DataFrame = spark.read.
    option(Constants.SERVER, "<server>.sql.azuresynapse.net,1433").
    synapsesql("<db>.dbo.trip").
    select("DateID", "PaymentType", "FareAmount"). 
    filter(col("PaymentType") === "CSH").
    limit(10)

dfToReadFromTable.show()

# 2) Connect to Synapse using JDBC and SQL authentication

In [None]:
import com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
import java.util.Properties

// Change values towards your environment
// Store the values in key vault.
val jdbcHostname = "<server>.sql.azuresynapse.net"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
val connectionProperties = new Properties()

// Driver that can also be observed in the log when using the offical Synapse SQL connection route
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
connectionProperties.setProperty("authentication", "SqlPassword")

// Store secrets in keyvault
// https://docs.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-secure-credentials-with-tokenlibrary?pivots=programming-language-scala#getsecret
connectionProperties.setProperty("username", TokenLibrary.getSecret("<key-vault-name>", "<property-containing-username>") )
connectionProperties.setProperty("password", TokenLibrary.getSecret("<key-vault-name>", "<property-containing-password>"))

// Set isolation level
// https://docs.microsoft.com/en-us/sql/connect/jdbc/understanding-isolation-levels?view=sql-server-ver15
connectionProperties.setProperty("setTransactionIsolation", "TRANSACTION_READ_UNCOMMITTED")

// Define your query
val pushdown_query = "(select top 10 DateID, PaymentType, FareAmount from dbo.trip where paymenttype = 'CSH') data_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

# 3) Connect to Synapse using JDBC and Managed Identity

In [None]:
import com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
import java.util.Properties

val jdbcHostname = "<server>.sql.azuresynapse.net"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
val connectionProperties = new Properties()

// Driver that can also be observed in the log when using the 'native' SynapseSQL way.
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

// Create a linked server to your dedicated pool with a Manged Identity
// Add linked server name to the getConnectionStringOrCreds function
connectionProperties.setProperty("accessToken", mssparkutils.credentials.getConnectionStringOrCreds("<linked-server-to-synapse"))

// Set isolation level
// https://docs.microsoft.com/en-us/sql/connect/jdbc/understanding-isolation-levels?view=sql-server-ver15
connectionProperties.setProperty("setTransactionIsolation", "TRANSACTION_READ_UNCOMMITTED")

// Define your query
val pushdown_query = "(select top 10 DateID, PaymentType, FareAmount from dbo.trip where paymenttype = 'CSH') data_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)