In [0]:
#import
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
import re

In [0]:
#Step 1: Load Datasets
sales_df = spark.read.csv("/FileStore/tables/sales.csv", header=True)
item_df = spark.read.csv("/FileStore/tables/item.csv", header=True)
promotion_df = spark.read.csv("/FileStore/tables/promotion.csv", header=True)
supermrkt_df = spark.read.csv("/FileStore/tables/supermarket.csv", header=True)

In [0]:
#Update Sales Data Type
sales_df = sales_df.withColumn("amount", sales_df.amount.cast("decimal(10,2))"))

In [0]:
#Correct the column name - description
item_df = item_df.withColumnRenamed("descrption", "description")

In [0]:
#Update Items Data Type - remove spaces
from pyspark.sql import functions as F 
item_df = item_df.withColumn('description', F.trim(F.regexp_replace('description', r'\s+', ' ')))

In [0]:
#item_df.filter(col('code') == '2409407009').display()

In [0]:
# 1. UDF to extract size
def extract_size(text):
    if not text:
        return None
    match = re.search(r'(\d+(\.\d+)?\s*(fl|oz|lb))', text.lower())
    if match:
        size = match.group(0)
        size = re.sub(r'(\d+)(fl|oz|lb)', r'\1 \2', size.replace(' ', '')).lower()
        return size
    return None

extract_size_udf = udf(extract_size, StringType())


In [0]:
# 2. UDF to check if existing size is valid
def is_valid_size(size):
    if not size:
        return False
    return bool(re.search(r'\d+\s*(fl|oz|lb)', size.lower()))

is_valid_size_udf = udf(is_valid_size)


In [0]:
# 3. Update size column only if invalid
item_df = item_df.withColumn(
    "size",
    when(~is_valid_size_udf(col("size")), extract_size_udf(col("description"))).otherwise(col("size"))
)

In [0]:
# 4. Remove size from description
def remove_size(text):
    if not text:
        return None
    return re.sub(r'\d+(\.\d+)?\s*(fl|oz|lb)', '', text, flags=re.IGNORECASE).strip()

remove_size_udf = udf(remove_size, StringType())
item_df = item_df.withColumn("description", remove_size_udf(col("description")))

In [0]:
#convert to lowercase of size and remove extra space
item_df = item_df.withColumn(
    "size",
    lower(
        trim(
            regexp_replace(
                col("size"),
                r"[^0-9\s\.ozfllb]", "",))))

In [0]:
#item_df.display()

In [0]:
#renaming column 'supermarkets' in promotion table as supermarket to match in sales table
promotion_df = promotion_df.withColumnRenamed("supermarkets", "supermarket")