In [0]:
dbutils.fs.mount(
    source = "wasbs://raw-data@azureuaenorthst.blob.core.windows.net",
    mount_point = "/mnt/raw-data",
    extra_configs = {
        "fs.azure.account.key.azureuaenorthst.blob.core.windows.net": dbutils.secrets.get(
            "databricksScope", "secretkv"
        )
    }

)

True

In [0]:
dbutils.fs.mount(
    source = "wasbs://transformed-data@azureuaenorthst.blob.core.windows.net",
    mount_point = "/mnt/transformed-data",
    extra_configs = {
        "fs.azure.account.key.azureuaenorthst.blob.core.windows.net": dbutils.secrets.get(
            "databricksScope", "secretkv"
        )
    }

)

True

In [0]:
dbutils.fs.ls("/mnt/transformed-data")

[]

In [0]:
accounts_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/accounts.csv")
data_dictionary_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/data_dictionary.csv")
products_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/products.csv")
sales_pipeline_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_pipeline.csv")
sales_teams_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_teams.csv")

accounts_df.show(2)

+----------------+---------+----------------+-------+---------+---------------+-------------+
|         account|   sector|year_established|revenue|employees|office_location|subsidiary_of|
+----------------+---------+----------------+-------+---------+---------------+-------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|         NULL|
|      Betasoloin|  medical|            1999| 251.41|      495|  United States|         NULL|
+----------------+---------+----------------+-------+---------+---------------+-------------+
only showing top 2 rows


In [0]:
print(accounts_df.columns)
print(data_dictionary_df.columns)
print(products_df.columns)
print(sales_pipeline_df.columns)
print(sales_teams_df.columns)

['account', 'sector', 'year_established', 'revenue', 'employees', 'office_location', 'subsidiary_of']
['Table', 'Field', 'Description']
['product', 'series', 'sales_price']
['opportunity_id', 'sales_agent', 'product', 'account', 'deal_stage', 'engage_date', 'close_date', 'close_value']
['sales_agent', 'manager', 'regional_office']


In [0]:
accounts_df = accounts_df.withColumnRenamed("subsidiary_of", "parent_company")
data_dictionary_df = data_dictionary_df.withColumnRenamed("Table", "table").withColumnRenamed("Field", "field").withColumnRenamed("Description","description")

In [0]:
from pyspark.sql.functions import col, sum as spark_sum

null_counts_accounts_df = accounts_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in accounts_df.columns])
null_counts_data_dictionary_df = data_dictionary_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in data_dictionary_df.columns])
null_counts_products_df = products_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in products_df.columns])
null_counts_sales_pipeline_df = sales_pipeline_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in sales_pipeline_df.columns])
null_counts_sales_teams_df = sales_teams_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in sales_teams_df.columns])

display(null_counts_accounts_df)
display(null_counts_data_dictionary_df)
display(null_counts_products_df)
display(null_counts_sales_pipeline_df)
display(null_counts_sales_teams_df)

account,sector,year_established,revenue,employees,office_location,parent_company
0,0,0,0,0,0,70


table,field,description
0,0,0


product,series,sales_price
0,0,0


opportunity_id,sales_agent,product,account,deal_stage,engage_date,close_date,close_value
0,0,0,1425,0,500,2089,2089


sales_agent,manager,regional_office
0,0,0


In [0]:
accounts_df = accounts_df.fillna(
    {
        "parent_company":"Independent"
    }
)

sales_pipeline_df = sales_pipeline_df.fillna(
    {
        "account":"unknown"
    }
)


In [0]:
accounts_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/accounts.csv")
data_dictionary_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/data_dictionary.csv")
products_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/products.csv")
sales_pipeline_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/sales_pipeline.csv")
sales_teams_df.write.option("header", "true").mode("overwrite").csv("/mnt/transformed-data/sales_teams.csv")