In [None]:
import re

import polars as pl

#### Cleanse OPW Data

In [None]:
df_item = pl.read_parquet("../data/raw/items.parquet")
df_price = pl.read_parquet("../data/raw/prices.parquet")

In [None]:
cols = {
    "code": "sku",
    "cat1Name.en": "department_en",
    "cat1Name.zh-Hant": "department_zh",
    "cat2Name.en": "category_en",
    "cat2Name.zh-Hant": "category_zh",
    "cat3Name.en": "subcategory_en",
    "cat3Name.zh-Hant": "subcategory_zh",
    "brand.en": "brand_en",
    "brand.zh-Hant": "brand_zh",
    "name.en": "name_en",
    "name.zh-Hant": "name_zh",
}

df_item = (
    df_item
    .unique(subset="code")
    .select(cols)
    .rename(cols)
)

In [None]:
cols = {
    "code": "sku",
    "date": "effective_date",
    "supermarketCode": "supermarket",
    "en": "promotion_en",
    "zh-Hant": "promotion_zh",
    "price": "original_price",
}

df_price = (
    df_price
    .with_columns(
        pl.col(["en", "zh-Hant"]).fill_null("No Promotion"),
        pl.col("price").str.extract(r"([\d\.]+)")
            .cast(pl.Float32)
            .fill_null(0),
    )
    .select(cols)
    .rename(cols)
)

#### Analyse Promotion

In [None]:
def split_promotion(mkt_txt: str) -> list[str]:
    txt = re.sub(r"\s?wk\d+\s?", "", mkt_txt.lower().strip())
    txt = re.sub(r"; |/|[a-z]\.[a-z]", "<sep>", txt)
    txt = re.sub(r"half price", "50%", txt)
    txt = re.sub(r"second", "2nd", txt)

    return list(map(str.strip, txt.split("<sep>")))

In [None]:
def get_pattern(promotion: str) -> str:
    pattern_tags = {
        r"\$\d+(\.\d+)?": "<AMT>",
        r"\d+(\.\d+)?\%": "<PCT>",
        r"\d+": "<CNT>",
    }

    txt = promotion.lower().strip()

    for pat, tag in pattern_tags.items():
        txt = re.sub(pat, tag, txt)

    return txt

In [None]:
def extract_values(data: dict[str, str]) -> list[float]:
    values = []
    if data["category"]:
        promotions = data["promotion"].split()
        patterns = data["pattern"].split()
        for promotion, pattern in zip(promotions, patterns):
            if re.search(r"<AMT>|<CNT>|<PCT>", pattern):
                nums = re.findall(r"(\d+\.?\d{,2})", promotion)
                values += [float(num) for num in nums]

    return values

In [None]:
def calculate_discount(data: dict[str, float | list[float]]) -> float:
    price, pattern, category, values = data.values()
    discount = price

    try:
        match category:
            case 2:
                if pattern == "+<AMT> for <CNT>nd item":
                    discount = price + values[0]
                    discount /= values[1]
                elif pattern == "<AMT> for <CNT>":
                    discount = values[0] / values[1]
            case 4:
                if re.search("<CNT>\s.*save" , pattern):
                    discount = price * values[0] - values[1]
                    discount /= values[0]
                elif re.search("<CNT>\s" , pattern):
                    discount = values[1] / values[0]
            case 5:
                if re.search("free the most expensive one" , pattern):
                    discount = price * (values[0] - 1)
                    discount /= values[0]
                elif re.search("get <CNT> free" , pattern):
                    discount = price * values[0]
                    discount /= values[0] + values[1]
            case 6:
                if re.search("<CNT>\w" , pattern):
                    discount = price * (values[0] - 1) + price * (1 - values[1] / 100)
                    discount /= values[0]
                else:
                    discount = price * values[0] * (1 - values[1] / 100)
                    discount /= values[0]
            case 8:
                if re.search("<CNT>\w" , pattern):
                    discount = price * (1 - values[0] / 100) + price * (values[1] - 1)
                    discount /= values[1]
                else:
                    discount = price * (1 - values[0] / 100)
            case _:
                discount = price
    except Exception:
        pass

    return discount if price * .3 < discount else price

In [None]:
df_price = (
    df_price
    .with_columns(
        pl.col("promotion_en")
            .map_elements(split_promotion, return_dtype=list[str])
            .alias("promotion"),
    )
    .explode("promotion")
    .with_columns(
        pl.col("promotion")
            .map_elements(get_pattern, return_dtype=str)
            .alias("pattern"),
    )
    .with_columns(
        pl.col("pattern").str.count_matches("<AMT>").alias("amt"),
        pl.col("pattern").str.count_matches("<CNT>").alias("cnt"),
        pl.col("pattern").str.count_matches("<PCT>").alias("pct"),
    )
    .with_columns(
        pl.when(  # NA
            pl.col("pattern").str.contains(r"<AMT>.*<AMT>")
                & (pl.col("amt") == 2)
                & (pl.col("cnt") == 0)
                & (pl.col("pct") == 0)
        ).then(1)
        .when(  # <$> for <n>
            pl.col("pattern").str.contains(r"<AMT>.*<CNT>")
                & (pl.col("amt") == 1)
                & (pl.col("cnt") == 1)
                & (pl.col("pct") == 0)
        ).then(2)
        .when(  # NA
            pl.col("pattern").str.contains(r"<AMT>.*<PCT>")
                & (pl.col("amt") == 1)
                & (pl.col("cnt") == 0)
                & (pl.col("pct") == 1)
        ).then(3)
        .when(  # buy <n> at/save <$>
            pl.col("pattern").str.contains(r"<CNT>.*<AMT>")
                & (pl.col("amt") == 1)
                & (pl.col("cnt") == 1)
                & (pl.col("pct") == 0)
        ).then(4)
        .when(  # buy <n> get <n> free
            pl.col("pattern").str.contains(r"<CNT>.*<CNT>")
                & (pl.col("amt") == 0)
                & (pl.col("cnt") == 2)
                & (pl.col("pct") == 0)
        ).then(5)
        .when(  # buy <n> at <%>
            pl.col("pattern").str.contains(r"<CNT>.*<PCT>")
                & (pl.col("amt") == 0)
                & (pl.col("cnt") == 1)
                & (pl.col("pct") == 1)
        ).then(6)
        .when(  # NA
            pl.col("pattern").str.contains(r"<PCT>.*<AMT>")
                & (pl.col("amt") == 1)
                & (pl.col("cnt") == 0)
                & (pl.col("pct") == 1)
        ).then(7)
        .when(  # <%> for <n>
            pl.col("pattern").str.contains(r"<PCT>.*<CNT>")
                & (pl.col("amt") == 0)
                & (pl.col("cnt") == 1)
                & (pl.col("pct") == 1)
        ).then(8)
        .when(  # NA
            pl.col("pattern").str.contains(r"<PCT>.*<PCT>")
                & (pl.col("amt") == 0)
                & (pl.col("cnt") == 0)
                & (pl.col("pct") == 2)
        ).then(9)
        .otherwise(0)
        .alias("category"),
    )
    .with_columns(
        pl.struct("promotion", "pattern", "category")
            .map_elements(extract_values, return_dtype=list[float])
            .alias("value"),
    )
    .with_columns(
        pl.struct("original_price", "pattern", "category", "value")
            .map_elements(calculate_discount, return_dtype=float)
            .alias("unit_price"),
    )
    .sort(["sku", "effective_date", "supermarket", "unit_price"])
    .unique(subset=["sku", "effective_date", "supermarket"])
    .drop(["amt", "cnt", "pct", "promotion", "pattern", "category", "value"])
)

In [None]:
df_item.write_parquet("../data/cleansed/items.parquet")
df_price.write_parquet("../data/cleansed/prices.parquet")