In [None]:
# create a databricks secret scope linked to Azure key vault
databricks secrets create-scope --scope azure-keyvault --scope-backend-type AZURE_KEYVAULT --resource-id </subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.KeyVault/vaults/<key-vault-name>> --dns-name <https://sales-data-key-vault.vault.azure.net/>

# mount ADLS Gen2 filesystem
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": dbutils.secrets.get(scope="azure-keyvault", key="application-client-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="azure-keyvault", key="secret-key"),
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{dbutils.secrets.get(scope='azure-keyvault', key='directory-tenant-id')}/oauth2/token"
}

dbutils.fs.mount(
    source="abfss://sales-data@salesdatabenji.dfs.core.windows.net",
    mount_point="/mnt/salesdata",
    extra_configs=configs
)

In [None]:
#  list the contents of a specified directory
%fs ls "/mnt/salesdata"


In [None]:
# load our data
merged_data = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/salesdata/raw-data/merged_sales_data.csv")
merged_data.show()

In [None]:
from pyspark.sql.functions import col

# count null values in each column
null_counts = {col: merged_data.filter(merged_data[col].isNull()).count() for col in merged_data.columns}
print(null_counts)

# get the total number of null values across all columns
total_nulls = sum(null_counts.values())
print(total_nulls)

In [None]:
# drop null values in each column
merged_data_cleaned = merged_data.na.drop()
merged_data_cleaned.show()

In [None]:
# rename columns
merged_data_cleaned = merged_data_cleaned.withColumn("Quantity Ordered", col("Quantity Ordered").cast("int"))
merged_data_cleaned = merged_data_cleaned.withColumn("Price Each", col("Price Each").cast("float"))

In [None]:
# cleaned data save to a different path
merged_data_cleaned.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/salesdata/cleaned-data/merged_sales_data")