### Configuration

In [0]:
# Instead of mounting, we just set the Spark Config for the session
storage_account_name = "crmstorageaccountvarad"
container_name = "raw-data"
client_id = "databricksScopeCRM"
secret_name = "crmsecretkey"

# Configure Spark to use your secret directly
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    dbutils.secrets.get(scope=client_id, key=secret_name)
)

# Now you can read data directly using the full path
path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/accounts.csv"
df = spark.read.csv(path, header=True)
display(df)

In [0]:
# Match the wasbs protocol and blob endpoint you used for the successful DF read
dbutils.fs.ls("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/")

In [0]:
# Instead of mounting, we just set the Spark Config for the session
storage_account_name = "crmstorageaccountvarad"
container_name = "transformed-data"
client_id = "databricksScopeCRM"
secret_name = "crmsecretkey"

# Configure Spark to use your secret directly
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
    dbutils.secrets.get(scope=client_id, key=secret_name)
)


In [0]:
# Match the wasbs protocol and blob endpoint you used for the successful DF read
dbutils.fs.ls("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/")

### Ingestion and Transformation

In [0]:
accounts_df = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .load("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/accounts.csv")

data_dictionary_df = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .load("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/data_dictionary.csv")

products_df = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .load("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/products.csv")

sales_pipeline_df = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .load("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/sales_pipeline.csv")

sales_teams_df = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .load("wasbs://raw-data@crmstorageaccountvarad.blob.core.windows.net/sales_teams.csv")


In [0]:
print(accounts_df.columns)
print(data_dictionary_df.columns)
print(products_df.columns)
print(sales_pipeline_df.columns)
print(sales_teams_df.columns)

In [0]:
# Renaming column names for better understanding
accounts_df = accounts_df.withColumnRenamed("subsidiary_of","parent_company")
data_dictionary_df = data_dictionary_df.withColumnRenamed("Table","table").withColumnRenamed("Field","field").withColumnRenamed("Description","description")
accounts_df.show(2)

In [0]:
from pyspark.sql.functions import *
null_counts_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])
null_counts_data_dictionary_df = data_dictionary_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in data_dictionary_df.columns])
null_counts_products_df = products_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in products_df.columns])
null_counts_sales_pipeline_df = sales_pipeline_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_pipeline_df.columns])
null_counts_sales_teams_df = sales_teams_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_teams_df.columns])

# null_counts_accounts_df.show()))])
display(null_counts_accounts_df)
display(null_counts_data_dictionary_df)
display(null_counts_products_df)
display(null_counts_sales_pipeline_df)
display(null_counts_sales_teams_df)

In [0]:
sales_pipeline_df.display()

In [0]:
accounts_df = accounts_df.fillna({"parent_company":"Independent"})
sales_pipeline_df = sales_pipeline_df.fillna({"account":"Unknown"})

### Updating data

In [0]:
# accounts_df.write.option("header","true").csv("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/accounts.csv")
data_dictionary_df.write.option("header","true").csv("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/data_dictionary.csv")
products_df.write.option("header","true").csv("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/products.csv")
sales_pipeline_df.write.option("header","true").csv("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/sales_pipeline.csv")
sales_teams_df.write.option("header","true").csv("wasbs://transformed-data@crmstorageaccountvarad.blob.core.windows.net/sales_teams.csv")