This code is used for examples related to the Mastering Databricks lake house platform. To use it, you need to have a table schema and sample data, which are provided separately. You can set up the sample data on an Azure SQL instance that Databricks can access, either on a public or private IP or domain. You also need to create a storage account, preferably ADLS Gen2, to store the output data. Finally, you need to change the connection parameters accordingly to make this example work.

In [0]:
%scala
//Check if SQL server JDBC driver is available
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

In [0]:
%scala
//Setup database connection string
val jdbcHostname = "your SQL Server Database IP or Host Name"
val jdbcPort = 1433
val jdbcDatabase = "Database Name"
// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"
// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", s"datbaseuserid")
connectionProperties.put("password", s"databsepassword")

In [0]:
%scala
//Set the connection properties
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

In [0]:
%scala
//Load the exam_data in the Exam_Data_From_SQLServer_DB variable
val ExamdataFromSQLDB = spark.read.jdbc(jdbcUrl, "oltpstore", connectionProperties)

In [0]:
%scala
//Convert the Exam_Data_From_SQLServer_DB to DataFrames for processing
val ExamDataInDF = ExamdataFromSQLDB.toDF()

In [0]:
%scala
// Apply filter to select exam results from all the courses
val FilterdExamResultsInDF = ExamDataInDF.filter($"jsonobjecttype" === "CertificateRequest").select($"jsondata")
FilterdExamResultsInDF.createOrReplaceTempView("resultjson")
display(FilterdExamResultsInDF)

In [0]:
%scala
import org.apache.spark.sql.hive.HiveContext
val sqlContext = new HiveContext(sc)
val resultjsonfilterd = sqlContext.sql("SELECT from_json(jsondata,'EventID string,OrganizationGuid string,TeamID string, UserId string,emailId string,Marks int,TotalMarks int,completiondate Date,passstate string') FROM resultjson") 
val resultstore = resultjsonfilterd.select("from_json(jsondata).*")
display(resultstore)
resultstore.createOrReplaceTempView("resultstore")

In [0]:
%scala
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val dfiltered = sqlContext.sql("select from_json(jsondata,'UserId string,emailId string,Marks int,TotalMarks int,completiondate string,passstate int') FROM oltpstore") 
//select(explode($"employees"))
//df.select(col("name.*")
val flattenDF = dfiltered.select("from_json(jsondata).*")
//val df2Flatten = flattenDF.toDF("UserId","emailId","Marks","TotalMarks","completiondate","passstate")
  // df2Flatten.printSchema()
//df2Flatten.show()
display(flattenDF)
dfiltered.createOrReplaceTempView("oltpstore1")

In [0]:
adlsAccountName = "dbstodfsdfsdf"
adlsContainerName = "dbdsdssd"
adlsFolderName = "RAW"
mountPoint = "/mnt/raw"
 
# Application (Client) ID
#applicationId = dbutils.secrets.get(scope="aks001sdgs1",key="ClientId")
 applicationId = "0b57c728-198d-479857897-997699-07d10";
# Application (Client) Secret Key
#authenticationKey = dbutils.secrets.get(scope="akv-07011",key="ClientSecret")
 authenticationKey =""
# Directory (Tenant) ID
tenandId = dbutils.secrets.get(scope="aks001sdgs1",key="TenantId")
 
endpoint = "https://login.microsoftonline.com/" + tenandId + "/oauth2/token"
source = "abfss://" + adlsContainerName + "@" + adlsAccountName + ".dfs.core.windows.net/" + adlsFolderName
 
# Connecting using Service Principal secrets and OAuth
configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": applicationId,
           "fs.azure.account.oauth2.client.secret": authenticationKey,
           "fs.azure.account.oauth2.client.endpoint": endpoint}
 
# Mount ADLS Storage to DBFS only if the directory is not already mounted
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
  dbutils.fs.mount(
    source = source,
    mount_point = mountPoint,
    extra_configs = configs)

In [0]:
%scala
spark.conf.set(
  "fs.azure.account.key.dbstoragegen2mc4u.dfs.core.windows.net", "DIt5jItsdgsdgsdgsda92q4sdgsd-jPaDerO-sdgsdgsdg5/A==")

In [0]:
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://newconatinerdsg4dgu@dbsgtordfsdu.dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

In [0]:
%scala
dbutils.fs.ls("abfss://dbdemo@dbstoragesdgsu.dfs.core.windows.net/")

In [0]:
%scala

resultstore.printSchema()

resultstore.write.mode(SaveMode.Overwrite)
.option("header","true")
.csv("abfss://dbdemo@dbstoragegsdgdg.dfs.core.windows.net/RAW/resultstore.csv")
resultstore.write.mode(SaveMode.Overwrite)
.option("header","true")
.parquet("abfss://dbdemo@dbstoragegesddsg.dfs.core.windows.net/RAW/resultstore.parquet")

In [0]:
%sql drop TABLE parquetresultstore

In [0]:

%sql 
CREATE TABLE parquetresultstore
USING parquet
OPTIONS (path "abfss://dbdemo@dbstoragegedsgsg.dfs.core.windows.net/RAW/resultstore.parquet")



In [0]:
df = spark.sql('select * from  parquetresultstore') 

df.write.format("delta").mode('overwrite').option("overwriteSchema", "true").save("abfss://dbdemo@dbstoragesdgsgs.dfs.core.windows.net/RAW/parquetresultstore.delta") 

spark.sql("CREATE TABLE IF NOT EXISTS deltaresultstore USING DELTA LOCATION 'abfss://dbdemo@dbstoragesdgsdg.dfs.core.windows.net/RAW/parquetresultstore.delta'")

In [0]:
%sql
select * from deltaresultstore where passstate = "1" and userid !="null"