In [None]:
# databricks_cleaning_local_fixed.ipynb
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, round, lit, when, trim

# Start Spark session
spark = SparkSession.builder.appName("IKODataCleaning").getOrCreate()

# Load data
sales = spark.read.csv("../raw-data/sales.csv", header=True, inferSchema=True)
products = spark.read.csv("../raw-data/products.csv", header=True, inferSchema=True)
regions = spark.read.csv("../raw-data/regions.csv", header=True, inferSchema=True)

# -----------------------------------------
# Clean / format SaleDate safely
# -----------------------------------------
# Trim whitespace
sales = sales.withColumn("SaleDate", trim(col("SaleDate")))

# Keep only valid date strings (MM/dd/yyyy or yyyy-MM-dd)
sales = sales.withColumn(
    "SaleDate",
    when(col("SaleDate").rlike(r"^\d{1,2}/\d{1,2}/\d{4}$"), col("SaleDate"))  # MM/dd/yyyy
    .when(col("SaleDate").rlike(r"^\d{4}-\d{1,2}-\d{1,2}$"), col("SaleDate"))  # yyyy-MM-dd
    .otherwise(lit(None))
)

# Convert to DateType (try both formats)
sales = sales.withColumn(
    "SaleDate",
    to_date(col("SaleDate"), "MM/dd/yyyy")
)
sales = sales.withColumn(
    "SaleDate",
    when(col("SaleDate").isNull(), to_date(trim(col("SaleDate")), "yyyy-MM-dd")).otherwise(col("SaleDate"))
)

# -----------------------------------------
# Alias for joins
# -----------------------------------------
sales_df = sales.alias("s")
products_df = products.alias("p")

# -----------------------------------------
# Join product data for profitability metrics
# -----------------------------------------
sales_enriched = (
    sales_df.join(products_df, col("p.ProductID") == col("s.ProductID"), "left")
    .withColumn("ProfitPerUnit", round(col("p.UnitPrice") - col("p.ProductionCost"), 2))
    .withColumn("TotalProfit", round(col("s.QuantitySold") * col("ProfitPerUnit"), 2))
)

# -----------------------------------------
# Fill missing / invalid data
# -----------------------------------------
sales_enriched = sales_enriched.fillna({
    "SalesChannel": "Unknown",
    "Region": "Unspecified"
})

# -----------------------------------------
# Convert to Pandas safely
# -----------------------------------------
# Cast SaleDate to string to avoid Spark DateTimeException
sales_enriched = sales_enriched.withColumn("SaleDate", col("SaleDate").cast("string"))

sales_enriched_pd = sales_enriched.toPandas()

# -----------------------------------------
# Save to CSV
# -----------------------------------------

sales_enriched_pd.to_csv("../data-warehouse/cleaned_sales.csv", index=False)

sales_enriched_pd.to_csv("../data-warehouse/cleaned_sales.csv", index=False)

print("Sales data cleaned and saved successfully!")


Sales data cleaned and saved successfully!


In [None]:
import sys
print(sys.executable)  # To verify the Python environment used