# Sales Data Cleaning & Preparation (Databricks)

This notebook performs data cleaning, validation, and transformation on raw sales data
ingested into Azure Data Lake Storage Gen2.  

The transformations include:
- Loading data from ADLS-mount points  
- Handling missing or inconsistent values  
- Standardizing column formats  
- Creating derived metrics  
- Generating cleaned datasets for Synapse Analytics  

## Import Required Libraries

We begin by importing PySpark functions needed for data cleaning and transformations.


In [None]:
from pyspark.sql.functions import sum, col, when

## ðŸ“‚ Explore Raw Data in ADLS (Mounted Storage)

We list the raw files stored in the Azure Data Lake (mounted on `/mnt/raw-data`).  
This allows us to verify file availability before performing transformations.


In [None]:
dbutils.fs.mount(
    source = "wasbs://raw-data@azureeastus2st.blob.core.windows.net/",
    mount_point = "/mnt/raw-data",
    extra_configs = {"fs.azure.account.key.azureeastus2st.blob.core.windows.net" : dbutils.secrets.get(
        "databricksScope", "qarsme"
    )}
)

dbutils.fs.mount(
    source = "wasbs://transformed-data@azureeastus2st.blob.core.windows.net/",
    mount_point = "/mnt/transformed-data",
    extra_configs = {"fs.azure.account.key.azureeastus2st.blob.core.windows.net" : dbutils.secrets.get(
        "databricksScope", "qarsme"
    )}
)

True

In [None]:
dbutils.fs.ls("/mnt/raw-data")

[FileInfo(path='dbfs:/mnt/raw-data/accounts.csv', name='accounts.csv', size=4670, modificationTime=1764545467000),
 FileInfo(path='dbfs:/mnt/raw-data/data_dictionary.csv', name='data_dictionary.csv', size=996, modificationTime=1764545483000),
 FileInfo(path='dbfs:/mnt/raw-data/products.csv', name='products.csv', size=171, modificationTime=1764545545000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_pipeline.csv', name='sales_pipeline.csv', size=637773, modificationTime=1764545568000),
 FileInfo(path='dbfs:/mnt/raw-data/sales_teams.csv', name='sales_teams.csv', size=1284, modificationTime=1764545585000)]

In [None]:
dbutils.fs.ls("/mnt/transformed-data")

[]

## ðŸ“¥ Load Raw CSV Files

The raw datasets are loaded into Spark DataFrames for processing.

Datasets:
- accounts.csv  
- products.csv  
- sales_pipeline.csv  
- sales_teams.csv  
- data_dictionary.csv  

In [None]:
accounts_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/accounts.csv")
data_dictionary_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/data_dictionary.csv")
products_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/products.csv")
sales_pipeline_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_pipeline.csv")
sales_teams_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/raw-data/sales_teams.csv")
accounts_df.show(2)

+----------------+---------+----------------+-------+---------+---------------+-------------+
|         account|   sector|year_established|revenue|employees|office_location|subsidiary_of|
+----------------+---------+----------------+-------+---------+---------------+-------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|         NULL|
|      Betasoloin|  medical|            1999| 251.41|      495|  United States|         NULL|
+----------------+---------+----------------+-------+---------+---------------+-------------+
only showing top 2 rows


## ðŸ§ª Data Overview & Schema Validation

In [None]:
print(accounts_df.columns)
print(data_dictionary_df.columns)
print(products_df.columns)
print(sales_pipeline_df.columns)
print(sales_teams_df.columns)

['account', 'sector', 'year_established', 'revenue', 'employees', 'office_location', 'subsidiary_of']
['Table', 'Field', 'Description']
['product', 'series', 'sales_price']
['opportunity_id', 'sales_agent', 'product', 'account', 'deal_stage', 'engage_date', 'close_date', 'close_value']
['sales_agent', 'manager', 'regional_office']


## ðŸ§¹ Data Cleaning & Transformations

Cleaning operations performed:
- Handling null values
- Renaming inconsistent columns
- Removing duplicates
- Type casting numeric fields
- Standardizing string formats

In [None]:
accounts_df = accounts_df.withColumnRenamed("subsidiary_of", "parent_company")
data_dictionary_df = data_dictionary_df.withColumnRenamed("Table", "table").withColumnRenamed("Field", "field").withColumnRenamed("Description", "description")

In [None]:
accounts_df.show(1)
data_dictionary_df.show(1)

+----------------+---------+----------------+-------+---------+---------------+--------------+
|         account|   sector|year_established|revenue|employees|office_location|parent_company|
+----------------+---------+----------------+-------+---------+---------------+--------------+
|Acme Corporation|technolgy|            1996|1100.04|     2822|  United States|          NULL|
+----------------+---------+----------------+-------+---------+---------------+--------------+
only showing top 1 row
+--------+-------+------------+
|   table|  field| description|
+--------+-------+------------+
|accounts|account|Company name|
+--------+-------+------------+
only showing top 1 row


In [None]:
null_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])

null_data_dictionary_df = data_dictionary_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in data_dictionary_df.columns])

null_products_df = products_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in products_df.columns])
 
null_sales_pipeline_df = sales_pipeline_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_pipeline_df.columns])

null_sales_teams_df = sales_teams_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_teams_df.columns])

display(null_accounts_df)
display(null_data_dictionary_df)
display(null_products_df)
display(null_sales_pipeline_df)
display(null_sales_teams_df)

account,sector,year_established,revenue,employees,office_location,subsidiary_of
0,0,0,0,0,0,70


Table,Field,Description
0,0,0


product,series,sales_price
0,0,0


opportunity_id,sales_agent,product,account,deal_stage,engage_date,close_date,close_value
0,0,0,1425,0,500,2089,2089


sales_agent,manager,regional_office
0,0,0


In [1]:
# sales_pipeline_df.display()
# accounts_df.display()

In [None]:
accounts_df = accounts_df.fillna({
    "parent_company" : "independent"
})

sales_pipeline_df = sales_pipeline_df.fillna({
    "account": "unknown"
})



In [2]:
# accounts_df.display()
# sales_pipeline_df.display()

In [None]:
null_accounts_df = accounts_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in accounts_df.columns])
null_accounts_df.show()

+-------+------+----------------+-------+---------+---------------+--------------+
|account|sector|year_established|revenue|employees|office_location|parent_company|
+-------+------+----------------+-------+---------+---------------+--------------+
|      0|     0|               0|      0|        0|              0|             0|
+-------+------+----------------+-------+---------+---------------+--------------+



In [None]:
null_sales_pipeline_df = sales_pipeline_df.select([sum(when(col(column).isNull(), 1).otherwise(0)).alias(column) for column in sales_pipeline_df.columns])
null_sales_pipeline_df.show()

## ðŸ’¾ Save Cleaned Data to ADLS (Transformed Container)

The cleaned datasets are written back to Azure Data Lake under the `transformed-data` container.  
These files are later used by Azure Synapse for querying via serverless SQL.


In [None]:
accounts_df.write.option("header", "true").csv("/mnt/transformed-data/accounts.csv")
data_dictionary_df.write.option("header", "true").csv("/mnt/transformed-data/data_dictionary.csv")
products_df.write.option("header", "true").csv("/mnt/transformed-data/products.csv")
sales_pipeline_df.write.option("header", "true").csv("/mnt/transformed-data/sales_pipeline.csv")
sales_teams_df.write.option("header", "true").csv("/mnt/transformed-data/sales_teams.csv")