In [0]:
%python
# Unmount the existing mount point if it exists
if any(mount.mountPoint == '/mnt/tokyoolymic' for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount('/mnt/tokyoolymic')

# Define the configurations
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": "655257ed-7dfd-4bd2-a4f8-a4bff35ef054",
    "fs.azure.account.oauth2.client.secret": 'Oyj8Q~t9u_MzDNAYLU~7j0QRx4uz7BWpF-LZYahl',
    "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/e0d9ab7b-9acb-4ee6-9105-7538a44aa880/oauth2/token"
}

# Mount the storage
dbutils.fs.mount(
    source="abfss://slaesdatacontainer@salesdatapipelinestorage.dfs.core.windows.net",
    mount_point="/mnt/tokyoolymic",
    extra_configs=configs
)

/mnt/tokyoolymic has been unmounted.


True

In [0]:
%fs
ls "/mnt/tokyoolymic"

path,name,size,modificationTime
dbfs:/mnt/tokyoolymic/cleaned-data/,cleaned-data/,0,1731936729000
dbfs:/mnt/tokyoolymic/metrics/,metrics/,0,1731947578000
dbfs:/mnt/tokyoolymic/raw-data/,raw-data/,0,1731850610000
dbfs:/mnt/tokyoolymic/transformed-data/,transformed-data/,0,1731850626000


In [0]:
spark

In [0]:
sales = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/tokyoolymic/raw-data/sales.csv")

In [0]:
sales.show()

+-------+--------------+---------------+-----------+---------+---------+----------+--------+------+----------+-------------------+----------+
|OrderID|  CustomerName|    PhoneNumber|   Location|  Country|StoreCode|   Product|Quantity| Price|      Date|   CreditCardNumber|ExpiryDate|
+-------+--------------+---------------+-----------+---------+---------+----------+--------+------+----------+-------------------+----------+
| HEXHEV|      John Doe|+1 990-186-7268|   New York|      USA|    ST026|     Phone|       5|979.97|2021-07-18|5676 8888 7887 7263|    Jul-29|
| JCKRFW|      John Doe|+1 659-832-6831|   New York|      USA|    ST154|Headphones|       1|218.94|2020-02-01|5676 8888 7887 8526|    Jan-23|
| PZXZUL|      John Doe|+1 564-127-5258|    Houston|      USA|    ST013|Headphones|       4|823.08|2020-10-11|5676 8888 7887 1823|    Mar-29|
| QELSGN|      John Doe|+1 571-789-2219|Los Angeles|      USA|    ST029|    Tablet|      10| 572.4|2021-08-17|5676 8888 7887 1242|    Nov-23|
| KCFB

In [0]:
sales.printSchema()

root
 |-- OrderID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- StoreCode: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- CreditCardNumber: string (nullable = true)
 |-- ExpiryDate: string (nullable = true)



data cleaning

In [0]:
from pyspark.sql.functions import col, lit, when, to_date

# Step 1: Load the sales dataset
sales = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolymic/raw-data/sales.csv")

# Print schema to verify column data types
sales.printSchema()

# Display initial data
sales.show(5)

# Step 2: Add or Create the Sales Column
# Check if 'Quantity' and 'Price' columns exist
if 'Quantity' in sales.columns and 'Price' in sales.columns:
    # Calculate Sales as Quantity * Price
    sales = sales.withColumn("Sales", col("Quantity").cast("double") * col("Price").cast("double"))
else:
    print("Columns 'Quantity' and 'Price' are missing, cannot calculate 'Sales'.")

# Step 3: Handle Missing Data
# Drop rows with missing critical fields
critical_fields = ["OrderID", "Sales", "Date"]
sales = sales.na.drop(subset=critical_fields)

# Fill missing non-critical fields with default values
sales = sales.fillna({
    "CustomerName": "Unknown",
    "PhoneNumber": "000-000-0000",
    "Location": "Unknown",
    "Country": "Unknown"
})

# Step 4: Remove Duplicates
# Remove duplicate rows based on OrderID
sales = sales.dropDuplicates(["OrderID"])

# Step 5: Validate and Correct Data Types
# Convert Date column to date type and drop rows with invalid dates
sales = sales.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd")).na.drop(subset=["Date"])

# Ensure Sales column is numeric
sales = sales.withColumn("Sales", col("Sales").cast("double"))

# Step 6: Handle Outliers
# Replace negative or unrealistic Sales values with 0
sales = sales.withColumn("Sales", when(col("Sales") < 0, 0).otherwise(col("Sales")))

# Optional: Remove rows with extreme sales values (e.g., greater than 1,000,000)
sales = sales.filter(col("Sales") <= 1000000)

# Step 7: Save the Cleaned Dataset in the Cleaned Data Container
# Coalesce to 1 partition to ensure a single output file
sales.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/mnt/tokyoolymic/cleaned-data/")

print("Cleaned data with 'Sales' column saved successfully as a single CSV file in dbfs:/mnt/tokyoolymic/cleaned-data/")


root
 |-- OrderID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- StoreCode: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- CreditCardNumber: string (nullable = true)
 |-- ExpiryDate: string (nullable = true)

+-------+------------+---------------+-----------+-------+---------+----------+--------+------+----------+-------------------+----------+
|OrderID|CustomerName|    PhoneNumber|   Location|Country|StoreCode|   Product|Quantity| Price|      Date|   CreditCardNumber|ExpiryDate|
+-------+------------+---------------+-----------+-------+---------+----------+--------+------+----------+-------------------+----------+
| HEXHEV|    John Doe|+1 990-186-7268|   New York|    USA|    ST026|     Phone|       5|979.9

Full Code for Transformation and Enrichment

In [0]:
from pyspark.sql.functions import col, lit, when, avg, sum as _sum, count, regexp_replace, sha2

# Step 1: Load the cleaned sales dataset
cleaned_data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/mnt/tokyoolymic/cleaned-data/")

# Display schema and initial rows
cleaned_data.printSchema()
cleaned_data.show(5)

# Step 2: Add the Sales Column (if not already present)
if 'Sales' not in cleaned_data.columns:
    if 'Quantity' in cleaned_data.columns and 'Price' in cleaned_data.columns:
        # Calculate Sales as Quantity * Price
        cleaned_data = cleaned_data.withColumn("Sales", col("Quantity") * col("Price"))
    else:
        raise Exception("Missing 'Quantity' or 'Price' columns. Cannot calculate 'Sales'.")

# Step 3: Mask or Hash PII Data (e.g., Credit Card Numbers)
if 'CreditCardNumber' in cleaned_data.columns:
    # Mask all but the last 4 digits of the credit card number
    cleaned_data = cleaned_data.withColumn(
        "MaskedCreditCardNumber", 
        regexp_replace(col("CreditCardNumber"), r".(?=.{4}$)", "*")
    )
    # Optionally hash the credit card number for secure storage
    cleaned_data = cleaned_data.withColumn(
        "HashedCreditCardNumber", 
        sha2(col("CreditCardNumber"), 256)  # SHA-256 Hashing
    )
    # Drop the original unmasked CreditCardNumber column for security
    cleaned_data = cleaned_data.drop("CreditCardNumber")

# Step 4: Transform Data for Insights
# Total Sales
total_sales = cleaned_data.agg(_sum("Sales").alias("TotalSales")).collect()[0]["TotalSales"]

# Average Order Value (AOV)
total_orders = cleaned_data.select("OrderID").distinct().count()
average_order_value = total_sales / total_orders

# Grouped Metrics: Sales by Product
sales_by_product = cleaned_data.groupBy("Product").agg(
    _sum("Sales").alias("TotalSales"),
    avg("Sales").alias("AverageSales"),
    count("OrderID").alias("TotalOrders")
)

# Grouped Metrics: Sales by Location
sales_by_location = cleaned_data.groupBy("Location").agg(
    _sum("Sales").alias("TotalSales"),
    avg("Sales").alias("AverageSales")
)

# Step 5: Add Sales Category Based on Sales Value
cleaned_data = cleaned_data.withColumn(
    "SalesCategory",
    when(col("Sales") < 500, "Low")
    .when((col("Sales") >= 500) & (col("Sales") < 2000), "Medium")
    .otherwise("High")
)

# Step 6: Save the Transformed Dataset in the Transformed Data Container
# Coalesce to 1 partition to ensure a single output file
cleaned_data.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/mnt/tokyoolymic/transformed-data/")

# Save grouped metrics for further analysis
sales_by_product.write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/mnt/tokyoolymic/metrics/sales_by_product/")
sales_by_location.write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/mnt/tokyoolymic/metrics/sales_by_location/")

# Display final transformed dataset
cleaned_data.show(5)

# Print metrics
print(f"Total Sales: {total_sales}")
print(f"Average Order Value (AOV): {average_order_value}")


root
 |-- OrderID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- PhoneNumber: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- StoreCode: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Date: date (nullable = true)
 |-- CreditCardNumber: string (nullable = true)
 |-- ExpiryDate: string (nullable = true)
 |-- Sales: double (nullable = true)

+-------+--------------+---------------+-----------+---------+---------+----------+--------+------+----------+-------------------+----------+------------------+
|OrderID|  CustomerName|    PhoneNumber|   Location|  Country|StoreCode|   Product|Quantity| Price|      Date|   CreditCardNumber|ExpiryDate|             Sales|
+-------+--------------+---------------+-----------+---------+---------+----------+--------+------+----------+-------------------+----------+-------