In [0]:
from pyspark.sql.functions import sum, col, when

### Mount the Data

In [0]:
dbutils.fs.mount (
    source = "wasbs://raw-data@crmsalespipelinesc.blob.core.windows.net",
    mount_point = "/mnt/raw-data",
    extra_configs = {
        "fs.azure.account.key.crmsalespipelinesc.blob.core.windows.net": dbutils.secrets.get(scope = "databricksScope", key = "secretkv")
    
    }
)

True

In [0]:
dbutils.fs.mount (
    source = "wasbs://transformed-data@crmsalespipelinesc.blob.core.windows.net",
    mount_point = "/mnt/transformed-data",
    extra_configs = {
        "fs.azure.account.key.crmsalespipelinesc.blob.core.windows.net": dbutils.secrets.get(scope = "databricksScope", key = "secretkv")
    
    }
)

True

In [0]:
dbutils.fs.ls("/mnt/raw-data")

[FileInfo(path='dbfs:/mnt/raw-data/accounts.csv', name='accounts.csv', size=4670, modificationTime=1763372384000),
 FileInfo(path='dbfs:/mnt/raw-data/data_dictionary.csv', name='data_dictionary.csv', size=996, modificationTime=1763372384000),
 FileInfo(path='dbfs:/mnt/raw-data/products.csv', name='products.csv', size=171, modificationTime=1763372383000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_pipeline.csv', name='sales_pipeline.csv', size=637773, modificationTime=1763372388000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_teams.csv', name='sales_teams.csv', size=1284, modificationTime=1763372383000)]

In [0]:
dbutils.fs.ls("/mnt/transformed-data")

[]

In [0]:
accounts_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/accounts.csv")
data_dictionary_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/data_dictionary.csv")
products_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/products.csv")
sales_pipeline_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_pipeline.csv")
sales_teams_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_teams.csv")

In [0]:
accounts_df.show(2)

+----------------+---------+----------------+-------+---------+---------------+-------------+
|         account|   sector|year_established|revenue|employees|office_location|subsidiary_of|
+----------------+---------+----------------+-------+---------+---------------+-------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|         NULL|
|      Betasoloin|  medical|            1999| 251.41|      495|  United States|         NULL|
+----------------+---------+----------------+-------+---------+---------------+-------------+
only showing top 2 rows


In [0]:
print(accounts_df.columns)
print(data_dictionary_df.columns)
print(products_df.columns)
print(sales_pipeline_df.columns)
print(sales_teams_df.columns)

['account', 'sector', 'year_established', 'revenue', 'employees', 'office_location', 'subsidiary_of']
['Table', 'Field', 'Description']
['product', 'series', 'sales_price']
['opportunity_id', 'sales_agent', 'product', 'account', 'deal_stage', 'engage_date', 'close_date', 'close_value']
['sales_agent', 'manager', 'regional_office']


### DataSet Cleaning

In [0]:
accounts_df = accounts_df.withColumnRenamed("subsidiary_of", "parent_company")
data_dictionary_df = data_dictionary_df.withColumnRenamed("Table", "table").withColumnRenamed("Field", "field").withColumnRenamed("Description", "description")

In [0]:
accounts_df.show(2)

+----------------+---------+----------------+-------+---------+---------------+--------------+
|         account|   sector|year_established|revenue|employees|office_location|parent_company|
+----------------+---------+----------------+-------+---------+---------------+--------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|          NULL|
|      Betasoloin|  medical|            1999| 251.41|      495|  United States|          NULL|
+----------------+---------+----------------+-------+---------+---------------+--------------+
only showing top 2 rows


In [0]:
data_dictionary_df.show(2)

+--------+-------+------------+
|   table|  field| description|
+--------+-------+------------+
|accounts|account|Company name|
|accounts| sector|    Industry|
+--------+-------+------------+
only showing top 2 rows


### Null Values 

In [0]:
null_counts_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])
null_counts_data_dictionary_df = data_dictionary_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in data_dictionary_df.columns])
null_counts_products_df = products_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in products_df.columns])
null_counts_sales_pipeline_df = sales_pipeline_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_pipeline_df.columns])
null_counts_sales_teams_df = sales_teams_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_teams_df.columns])

display(null_counts_accounts_df)
display(null_counts_data_dictionary_df)
display(null_counts_products_df)
display(null_counts_sales_pipeline_df)
display(null_counts_sales_teams_df)

account,sector,year_established,revenue,employees,office_location,parent_company
0,0,0,0,0,0,70


table,field,description
0,0,0


product,series,sales_price
0,0,0


opportunity_id,sales_agent,product,account,deal_stage,engage_date,close_date,close_value
0,0,0,1425,0,500,2089,2089


sales_agent,manager,regional_office
0,0,0


In [0]:
sales_pipeline_df.display()

opportunity_id,sales_agent,product,account,deal_stage,engage_date,close_date,close_value
1C1I7A6R,Moses Frase,GTX Plus Basic,Cancity,Won,2016-10-20,2017-03-01,1054.0
Z063OYW0,Darcel Schlecht,GTXPro,Isdom,Won,2016-10-25,2017-03-11,4514.0
EC4QE1BX,Darcel Schlecht,MG Special,Cancity,Won,2016-10-25,2017-03-07,50.0
MV1LWRNH,Moses Frase,GTX Basic,Codehow,Won,2016-10-25,2017-03-09,588.0
PE84CX4O,Zane Levy,GTX Basic,Hatfan,Won,2016-10-25,2017-03-02,517.0
ZNBS69V1,Anna Snelling,MG Special,Ron-tech,Won,2016-10-29,2017-03-01,49.0
9ME3374G,Vicki Laflamme,MG Special,J-Texon,Won,2016-10-30,2017-03-02,57.0
7GN8Q4LL,Markita Hansen,GTX Basic,Cheers,Won,2016-11-01,2017-03-07,601.0
OLK9LKZB,Niesha Huffines,GTX Plus Basic,Zumgoity,Won,2016-11-01,2017-03-03,1026.0
HAXMC4IX,James Ascencio,MG Advanced,,Engaging,2016-11-03,,


In [0]:
accounts_df = accounts_df.fillna({
    "parent_company" : "Independent"
})

sales_pipeline_df = sales_pipeline_df.fillna({
    "account": "Unknown"
})


In [0]:
null_counts_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])

null_counts_accounts_df.display()

account,sector,year_established,revenue,employees,office_location,parent_company
0,0,0,0,0,0,0


### Transformed Data

In [0]:
accounts_df.write.option("header", "true").csv("/mnt/transformed-data/accounts")
data_dictionary_df.write.option("header", "true").csv("/mnt/transformed-data/data_dictionary")
products_df.write.option("header", "true").csv("/mnt/transformed-data/products")
sales_pipeline_df.write.option("header", "true").csv("/mnt/transformed-data/sales_pipeline")
sales_teams_df.write.option("header", "true").csv("/mnt/transformed-data/sales_teams")