# Azure Databricks Tutorial

In [0]:
# databricks magic commands:
# %sh
# %fs
# %python
# %sql
# %pip install
# %conda
# %matplotlib

## Azure Service Principal Auth Setup

In [0]:
# note: all sensitive data should be stored in a key vault / databricks secret scope
# azure portal: https://<Databricks_url>#secrets/createScope
# use dbutils.secrets.get(scope = "", key = "")
# databricks cli secret option:
# databricks secrets list --scope <scope-name>
# databricks secrets put --scope <scope-name> --key <key-name>
# function for dbutils.secrets.get:
# def get_keys(keyName:str) -> str:
#     return dbutils.secrets.get(scope = 'formula1-scope', key = keyName)

# azure subscription - resource providers - register eventgrid
subscription_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"

# azure active directory - app registration portal
application_client_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
application_client_secret = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
directory_tenant_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"

# azure storage account portal
# give app registration "blob contrib" role on storage account
storage_account = "..."
storage_container = "..."
storage_path = ""

# azure storage endpoint options:
# HTTPS: https://storageaccount.blob.core.windows.net/container/path/to/blob
# WASBS - Windows Azure Storage Blob Secure: wasbs://containername@accountname.blob.core.windows.net
# ABFSS - Azure blob file system secure: abfss://filesystemname@accountname.dfs.core.windows.net
# note: you can mount this location in /mnt/ via dbutils.fs.mount()
storage_endpoint_url = (
    f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/{storage_path}"
)
if not storage_endpoint_url.endswith("/"):
    storage_endpoint_url += "/"

# azure databricks auth setup
spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth"
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net",
    application_client_id,
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net",
    application_client_secret,
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{directory_tenant_id}/oauth2/token",
)

## Interacting with File System

In [0]:
# file system utils
dbutils.fs.help()

In [0]:
# databricks dbfs root
dbutils.fs.ls(".")

# external/mounted storage
# dbutils.fs.ls("/mnt")
# dbutils.fs.mount() / dbutils.fs.unmount() / dbutils.fs.refreshMounts()
dbutils.fs.mounts()

In [0]:
# list dir
import pandas as pd
pd.DataFrame(dbutils.fs.ls(storage_endpoint_url))

## Read File

In [0]:
# custom schema and data types
from pyspark.sql.types import (
    StructType,
    StructField,
    TimestampType,
    StringType,
    DoubleType,
)
customschema = StructType(
    [
        StructField("timestamp", TimestampType(), True),
        StructField("column_2", DoubleType(), True),
        StructField("column_3", DoubleType(), True),
        StructField("column_4", DoubleType(), True),
        StructField("column_5", DoubleType(), True),
    ]
)

# read csv example
# read json example: df = spark.read.json(json_file_path)
# read parquet example: df = spark.read.parquet(parquet_file_path)
filename = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.csv"
df = spark.read.csv(
    f"{storage_endpoint_url}/{filename}", header=True, schema=customschema
)
display(df)

## Basic PySpark Operations

In [0]:
# schema
df.printSchema()

In [0]:
# preview data
df.show(10)

In [0]:
# basic stats
df.select("column_2").describe().show()

In [0]:
# other common spark commands

# write csv example
# df.write.format("csv").option("header","true").save("hdfs:///.../")

# write file to disk in parquet format - overwrite
# df.write.partitionBy('').format('parquet').mode('overwrite').save()
# write file to disk in parquet format - append
# df.write.partitionBy('').format('parquet').mode('append').save()

# from pyspark.sql.functions import lit <- constant columns

# EDA/ETL functions:
# spark.createDataFrame()
# .columns / .dtypes
# .select()
# .withColumn() <- add columns
# .cast() <- change type
# .alias()
# .withColumnRenamed() <- rename column
# .drop() <- remove column
# .where() / .filter()
# .join() / .union() / .merge()
# .pivot() / .melt()
# .groupBy().avg()/.sum()/.agg() <- group by, aggregate, and apply functions
# .sort()
# .count()
# .dropna() / .fillna()
# .limit()
# .take()
# .map() / .foreach()
# .explode()
# .show()
# .cache()

## Pandas Spark API

In [0]:
# alternative to standard PySpark SQL api
import pandas as pd
import pyspark.pandas as ps

# spark dataframe to pandas-on-spark dataframe
df_ps = df.to_pandas_on_spark()

# pandas to pandas-on-spark
df_pandas = pd.DataFrame({"A": [1, 2, 3], "B": [5, 6, 7]})
df_pyspark_pandas = ps.from_pandas(df_pandas)

# pandas-on-spark to pandas
df_pandas = df_pyspark_pandas.to_pandas()

# pandas-on-spark to spark dataframe
df_spark = df_pyspark_pandas.to_spark()

In [0]:
# preview
df_ps

In [0]:
# dataframe info
df_ps.info()

In [0]:
# basic operations
my_column = df_ps["column_2"]
my_sum = df_ps["column_2"].sum()
my_min = df_ps["column_3"].min()

# new column
df_ps["new_column"] = df_ps["column_2"] + df_ps["column_3"]

# filter
df_ps_filtered = df_ps[df_ps["new_column"] > 1]

# apply
def add_one(val):
    return val + 1


df_ps_filtered_and_add_one = df_ps_filtered[["column_2", "column_3", "column_4"]].apply(
    add_one, axis=0
)
df_ps_filtered_and_add_one

## Autoloader (Streaming Data)

### Azure Service Principal Auth Setup

In [0]:
# azure storage account portal
# resource group
storage_resource_group = "..."
# access key
storage_sas_key = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# generate queue connection string
storage_queue_connection_string = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"

# databricks autoloader config
# uses schema infer and evolution (rescues new columns)
spark.conf.set("spark.databricks.cloudfiles.schemaInference.sampleSize.numFiles", 10)
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_sas_key
)
checkpoint_location = storage_endpoint_url + "_checkpoint/"
cloudfile = {
    "cloudFiles.subscriptionID": subscription_id,
    "cloudFiles.connectionString": storage_queue_connection_string,
    "cloudFiles.format": "csv",
    "cloudFiles.tenantId": directory_tenant_id,
    "cloudFiles.clientId": application_client_id,
    "cloudFiles.clientSecret": application_client_secret,
    "cloudFiles.resourceGroup": storage_resource_group,
    "cloudFiles.inferColumnTypes": "true",
    # location for schema and checkpoint data
    "cloudFiles.schemaLocation": checkpoint_location,
    "cloudFiles.schemaEvolutionMode": "rescue",
    "cloudFiles.useNotifications": "true",
}
additional_options = {"header": True, "rescueDataColumn": "_rescued_data"}

### Read and Write Streaming Data

In [0]:
# read streaming files from azure data lake
df_autoloader = (
    spark.readStream.format("cloudFiles")
    # input options
    .options(**cloudfile).options(**additional_options)
    # define custom schema -> .schema()
    # input location
    .load(storage_endpoint_url)
)

In [0]:
# write delta table to azure data lake
df_autoloader_stream = (
    df_autoloader.writeStream.format("delta")
    .outputMode("append")
    .queryName("example_query_name")
    # run function on each batche / merge with existing data use .foreachBatch()
    .option("checkpointLocation", checkpoint_location)
    # trigger options: processingTime, availableNow, once
    .trigger(once=True)
    # output location
    .start(storage_endpoint_url + "data_table_output")
    # use .table() to save to databricks table
)

In [0]:
# status
print(df_autoloader_stream.status)
print(df_autoloader_stream.recentProgress)

### EDA via PySpark API

In [0]:
# read delta table from azure data lake
df_load = spark.read.format("delta").load(storage_endpoint_url + "data_table_output")
display(df_load)

In [0]:
# run pyspark query
(
    df_load.where(
        "column_2 > 0.9 AND column_3 > 0.9 AND column_4 > 0.9 AND column_5 > 0.9"
    )
    .sort("timestamp")
    .display()
)

### EDA via SQL API

In [0]:
# save to databricks local table
table_name = "my_test_table"
df_load.write.saveAsTable(table_name, mode="overwrite")

# save variable name for use in %SQL statements
spark.conf.set('personal.table_name', table_name)

# delete table
# spark.sql(f"DROP TABLE {table_name}")

# other SQL commands
# create table -> CREATE TABLE IF NOT EXISTS or CREATE OR REPLACE TABLE

In [0]:
#  table information via pyspark sql api
display(spark.sql(f"DESCRIBE DETAIL {table_name}"))

In [0]:
%sql 
DESCRIBE DETAIL ${personal.table_name}

In [0]:
%sql
/* run SQL query */
SELECT
  *
FROM
  ${personal.table_name}
WHERE
  column_2 > 0.9
  AND column_3 > 0.9
  AND column_4 > 0.9
  AND column_5 > 0.9
ORDER BY
  timestamp ASC
  /* 
  other commands:
  INSERT INTO table_name SELECT * FROM table_name_2
  INSERT OVERWRITE TABLE table_name SELECT * FROM table_name_2
  UPDATE 
  DELETE FROM
  DESCRIBE 
  OPTIMIZE ... ZORDER BY
  */

In [0]:
# save sql query results as spark dataframe
new_dataframe_name = _sqldf
display(new_dataframe_name)

## Delta Live Tables Pipeline

In [0]:
# NOTE: this does not work in a notebook, you must create a DLT pipeline
# GOTO: "workflows" - > "delta live tables" -> "create pipeline" -> select notebook

# example syntax:
# %pip install dlt
# import dlt
# @dlt.table(
#   name="<name>",
#   comment="<comment>",
#   spark_conf={"<key>" : "<value", "<key" : "<value>"},
#   table_properties={"<key>" : "<value>", "<key>" : "<value>"},
#   path="<storage-location-path>",
#   partition_cols=["<partition-column>", "<partition-column>"],
#   schema="schema-definition",
#   temporary=False)
# @dlt.expect
# @dlt.expect_or_fail
# @dlt.expect_or_drop
# @dlt.expect_all
# @dlt.expect_all_or_drop
# @dlt.expect_all_or_fail
# def <function-name>():
#     return (<query>)

In [0]:
@dlt.table(comment="raw data, bronze")
def dlt_table_bronze():
    return (
        spark.readStream.format("cloudFiles")
        .options(**cloudfile)
        .options(**additional_options)
        .load(storage_endpoint_url)
    )

@dlt.table(comment="data subset, silver")
def dlt_table_silver():
    return dlt.read("dlt_table_bronze").where("column_2 > 0.9")

@dlt.table(comment="final data, gold")
def dlt_table_gold():
    return dlt.read("dlt_table_silver").where("column_3 > 0.9").sort("timestamp").limit(10)

## References

- https://docs.databricks.com/ingestion/auto-loader/patterns.html
- https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-python-ref.html#python-spec
- https://docs.databricks.com/workflows/delta-live-tables/delta-live-tables-data-sources.html
- https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html
- https://api-docs.databricks.com/python/pyspark/latest/pyspark.pandas/frame.html