In [0]:
# Import necessary libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, year, month, dayofweek, hour
from pyspark.sql.types import IntegerType, FloatType, StringType, DateType, TimestampType
 
# Initialize Spark session
spark = SparkSession.builder.appName("RetailDataCleaning").getOrCreate()
 
# Load the CSV file into a Spark DataFrame
file_path = "/FileStore/tables/retail_data.csv"  # Update with your DBFS path
df = spark.read.csv(file_path, header=True,inferSchema=True)
 
# Display the data structure and some initial records
df.printSchema()
df.show(5)



root
 |-- Transaction_ID: integer (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: long (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Total_Purchases: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullab

In [0]:
# Check for missing values in each column
missing_counts = df.select([
    count(when((col(c).cast("float").isNotNull() & isnan(col(c).cast("float"))) | col(c).isNull(), c)).alias(c)
    for c in df.columns
])
display(missing_counts)

# Drop rows with missing Transaction_ID or Customer_ID
df = df.dropna(subset=["Transaction_ID", "Customer_ID"])

# Replace missing values in non-critical columns with default values
df = df.fillna({
    "Feedback": "Not Provided",
    "Ratings": 0,
    "Income": "0",
    "Email": "No Email",
    "Phone": 0
})

# Verify that missing values have been handled
missing_counts_after = df.select([
    count(when((col(c).cast("float").isNotNull() & isnan(col(c).cast("float"))) | col(c).isNull(), c)).alias(c)
    for c in df.columns
])
display(missing_counts_after)

Transaction_ID,Customer_ID,Name,Email,Phone,Address,City,State,Zipcode,Country,Age,Gender,Income,Customer_Segment,Date,Year,Month,Time,Total_Purchases,Amount,Total_Amount,Product_Category,Product_Brand,Product_Type,Feedback,Shipping_Method,Payment_Method,Order_Status,Ratings,products
333,308,382,347,362,315,248,281,340,271,173,317,290,215,359,350,273,350,361,357,350,283,281,0,184,337,297,235,184,0


Transaction_ID,Customer_ID,Name,Email,Phone,Address,City,State,Zipcode,Country,Age,Gender,Income,Customer_Segment,Date,Year,Month,Time,Total_Purchases,Amount,Total_Amount,Product_Category,Product_Brand,Product_Type,Feedback,Shipping_Method,Payment_Method,Order_Status,Ratings,products
0,0,380,0,0,313,248,280,340,270,173,316,0,215,359,348,272,350,361,357,348,282,278,0,0,337,297,235,0,0


In [0]:
# Convert `Age`, `Income`, and `Ratings` to numeric types
df = df.withColumn("Age", col("Age").cast(IntegerType())) \
       .withColumn("Income", col("Income").cast(FloatType())) \
       .withColumn("Ratings", col("Ratings").cast(FloatType()))
 
# Convert `Date` and `Time` columns to date and timestamp formats
df = df.withColumn("Date", col("Date").cast(DateType())) \
       .withColumn("Time", col("Time").cast(TimestampType()))
 
# Display updated schema
df.printSchema()

root
 |-- Transaction_ID: integer (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = false)
 |-- Phone: long (nullable = false)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: float (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Total_Purchases: integer (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullabl

In [0]:
# Ensure consistent capitalization for `Gender`, `Country`, `Order_Status`, and other key columns
df = df.withColumn("Gender", when(col("Gender").isin("male", "Male"), "Male")
                          .when(col("Gender").isin("female", "Female"), "Female")
                          .otherwise("Unknown"))
 
# Standardize `Country` column values (example for 'USA', 'UK' variations)
df = df.withColumn("Country", when(col("Country").isin("US", "USA", "United States"), "USA")
                           .when(col("Country").isin("UK", "United Kingdom"), "UK")
                           .otherwise(col("Country")))
 
# Verify the transformations
df.select("Gender", "Country", "Order_Status").distinct().show()

+-------+---------+------------+
| Gender|  Country|Order_Status|
+-------+---------+------------+
| Female|       UK|   Delivered|
|   Male|Australia|   Delivered|
|   Male|Australia|  Processing|
| Female|       UK|     Pending|
|   Male|  Germany|   Delivered|
| Female|       UK|     Shipped|
| Female|   Canada|  Processing|
|   Male|  Germany|  Processing|
|   Male|   Canada|  Processing|
|   Male|       UK|  Processing|
|   Male|       UK|     Shipped|
|   Male|       UK|        NULL|
| Female|       UK|  Processing|
|Unknown|       UK|     Shipped|
|   Male|       UK|   Delivered|
| Female|       UK|        NULL|
|Unknown|       UK|   Delivered|
| Female|Australia|     Pending|
|Unknown|       UK|     Pending|
|   Male|  Germany|     Shipped|
+-------+---------+------------+
only showing top 20 rows



In [0]:
# Define thresholds for detecting outliers in numerical columns (e.g., Age, Income)
age_upper_limit = 100
income_upper_limit = 200000
 
# Filter out or replace unrealistic values in Age and Income
df = df.withColumn("Age", when((col("Age") < 0) | (col("Age") > age_upper_limit), None).otherwise(col("Age")))
df = df.withColumn("Income", when((col("Income") < 0) | (col("Income") > income_upper_limit), None).otherwise(col("Income")))
 
# Replace extreme values in `Ratings` (keeping it between 1-5)
df = df.withColumn("Ratings", when(col("Ratings") > 5, 5).when(col("Ratings") < 1, 1).otherwise(col("Ratings")))
 
# Show updated data after handling outliers
df.show(5)

+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----+----+---------+-------------------+---------------+-----------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+-----------------+
|Transaction_ID|Customer_ID|               Name|              Email|     Phone|             Address|      City|          State|Zipcode|  Country|Age|Gender|Income|Customer_Segment|Date|Year|    Month|               Time|Total_Purchases|     Amount|Total_Amount|Product_Category|Product_Brand|Product_Type| Feedback|Shipping_Method|Payment_Method|Order_Status|Ratings|         products|
+--------------+-----------+-------------------+-------------------+----------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----+----+---------+-------------------+--------

In [0]:
# Create Age Bins for segmentation
df = df.withColumn("Age_Group", when(col("Age") < 18, "<18")
                                  .when((col("Age") >= 18) & (col("Age") <= 25), "18-25")
                                  .when((col("Age") >= 26) & (col("Age") <= 35), "26-35")
                                  .when((col("Age") >= 36) & (col("Age") <= 45), "36-45")
                                  .when((col("Age") >= 46) & (col("Age") <= 60), "46-60")
                                  .otherwise(">60"))
 
# Extract additional time-based features from the Date column
df = df.withColumn("Year", year(col("Date"))) \
       .withColumn("Month", month(col("Date"))) \
       .withColumn("Day_of_Week", dayofweek(col("Date")))
 
# Extract hour from the Time column for hourly analysis
df = df.withColumn("Hour", hour(col("Time")))
 
# Show the new columns
df.select("Age_Group", "Year", "Month", "Day_of_Week", "Hour").show(5)

+---------+----+-----+-----------+----+
|Age_Group|Year|Month|Day_of_Week|Hour|
+---------+----+-----+-----------+----+
|    18-25|NULL| NULL|       NULL|  22|
|    18-25|NULL| NULL|       NULL|   8|
|    46-60|NULL| NULL|       NULL|   4|
|    46-60|NULL| NULL|       NULL|  14|
|    18-25|NULL| NULL|       NULL|  16|
+---------+----+-----+-----------+----+
only showing top 5 rows



In [0]:
# Save the cleaned and transformed data back to Azure Blob or any designated storage
output_path = "/FileStore/tables/retail_data.csv"  # Update with your DBFS path
df.write.mode("overwrite").parquet(output_path)
print("Data cleaning and transformation completed. File saved to:", output_path)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2160637382772496>, line 3[0m
[1;32m      1[0m [38;5;66;03m# Save the cleaned and transformed data back to Azure Blob or any designated storage[39;00m
[1;32m      2[0m output_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/FileStore/tables/retail_data.csv[39m[38;5;124m"[39m  [38;5;66;03m# Update with your DBFS path[39;00m
[0;32m----> 3[0m df[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mparquet(output_path)
[1;32m      5[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mData cleaning and transformation completed. File saved to:[39m[38;5;124m"[39m, output_path)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwa