# CLEANING AND PREPROCESSING
The goal of this phase is to remove inconsistencies, missing values and duplications from the data, preparing it for analysis and modelling.

The cleaning process will be performed separately on each dataset as shown below:

In [2]:
#importing relevant libraries
import pandas as pd
import numpy as np
import re
from pathlib import Path


In [19]:

#  Setup (paths + helpers)

# ---- Paths (edit if needed) ----
RAW_DIR     = Path("../Data")
CLEAN_DIR   = Path("../Data/Cleaned Dataset")
CLEAN_DIR.mkdir(parents=True, exist_ok=True)

RAW_EVENTS  = RAW_DIR / "events.csv"                 # rename if different
RAW_CAT     = RAW_DIR / "category_tree.csv"
RAW_IP1     = RAW_DIR / "item_properties_part1.1.csv"  # rename if different
RAW_IP2     = RAW_DIR / "item_properties_part2.csv"

OUT_EVENTS_CSV  = CLEAN_DIR / "cleaned_user_events.csv"
OUT_EVENTS_PQ   = CLEAN_DIR / "cleaned_user_events.parquet"
OUT_CAT_CSV     = CLEAN_DIR / "cleaned_category_tree.csv"
OUT_IP_LATEST_PQ= CLEAN_DIR / "cleaned_item_properties_latest.parquet"  # long format, latest per (item,prop)
OUT_IP_WIDE_PQ  = CLEAN_DIR / "cleaned_item_properties_wide.parquet"    # optional wide format (selected props)

# Value cleaning helper: extract first numeric token; keep booleans; else keep string
num_pat = re.compile(r"[-+]?\d*\.?\d+")
def clean_value(v):
    if pd.isna(v):
        return np.nan
    if isinstance(v, (int, float)):
        return v
    s = str(v).strip().lower()
    if s in {"true", "yes"}:  return 1
    if s in {"false","no"}:   return 0
    m = num_pat.search(s.lstrip("n"))  # drop a leading 'n' then find number
    return float(m.group()) if m else s


1. Cleaning the category_tree dataset




In [None]:
# ---- Clean Category Tree ----
cat = pd.read_csv(RAW_CAT)

# Drop rows with missing ids
cat = cat.dropna(subset=["categoryid", "parentid"]).copy()

# Cast to int 
cat["categoryid"] = cat["categoryid"].astype(int)
cat["parentid"]   = cat["parentid"].astype(int)

# Save
cat.to_csv(OUT_CAT_CSV, index=False)

# Preview
print("== Cleaned Category Tree ==")
print(cat.info())
print(cat.head())


== Cleaned Category Tree ==
<class 'pandas.core.frame.DataFrame'>
Index: 1644 entries, 0 to 1668
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype
---  ------      --------------  -----
 0   categoryid  1644 non-null   int64
 1   parentid    1644 non-null   int64
dtypes: int64(2)
memory usage: 38.5 KB
None
   categoryid  parentid
0        1016       213
1         809       169
2         570         9
3        1691       885
4         536      1691


2. Cleaning the Events Datasets

In [7]:
# ---- Clean User Events ----
events = pd.read_csv(RAW_EVENTS)

# Convert timestamp to datetime (ms → ns)
events["event_time"] = pd.to_datetime(events["timestamp"], unit="ms", errors="coerce")

# Drop impossible rows (missing criticals)
events = events.dropna(subset=["itemid", "event", "event_time"]).copy()

# Optional bot filter: users with > X events (tune or comment out)
BOT_THRESHOLD = None  # e.g., 10_000; set None to skip
if BOT_THRESHOLD is not None:
    vc = events["visitorid"].value_counts()
    bots = vc[vc > BOT_THRESHOLD].index
    events = events[~events["visitorid"].isin(bots)].copy()

# Drop duplicate full rows (safety)
events = events.drop_duplicates().reset_index(drop=True)

# Save
events.to_csv(OUT_EVENTS_CSV, index=False)
#events.to_parquet(OUT_EVENTS_PQ, index=False)

# Preview
print("== Cleaned User Events ==")
print(events.info())
print(events.head())


== Cleaned User Events ==
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2755641 entries, 0 to 2755640
Data columns (total 6 columns):
 #   Column         Dtype         
---  ------         -----         
 0   timestamp      int64         
 1   visitorid      int64         
 2   event          object        
 3   itemid         int64         
 4   transactionid  float64       
 5   event_time     datetime64[ns]
dtypes: datetime64[ns](1), float64(1), int64(3), object(1)
memory usage: 126.1+ MB
None
       timestamp  visitorid event  itemid  transactionid  \
0  1433221332117     257597  view  355908            NaN   
1  1433224214164     992329  view  248676            NaN   
2  1433221999827     111016  view  318965            NaN   
3  1433221955914     483717  view  253185            NaN   
4  1433221337106     951259  view  367447            NaN   

               event_time  
0 2015-06-02 05:02:12.117  
1 2015-06-02 05:50:14.164  
2 2015-06-02 05:13:19.827  
3 2015-06-02 05:12:35

3. Cleaning the Item_properties datasets

In [20]:


OUT_IP_LATEST_PARQUET = CLEAN_DIR / "cleaned_item_properties_latest.parquet"
OUT_IP_LATEST_CSV     = CLEAN_DIR / "cleaned_item_properties_latest.csv"

# Optional reduced wide (pivot) outputs
OUT_IP_WIDE_PARQUET = CLEAN_DIR / "cleaned_item_properties_wide.parquet"
OUT_IP_WIDE_CSV     = CLEAN_DIR / "cleaned_item_properties_wide.csv"

# ---- Config ----
CHUNKSIZE = 1_000_000  # tune down if memory is tight (e.g., 500_000 or 250_000)
PRINT_EVERY = 1        # progress print frequency (chunks)

# Reduced-wide pivot selection (optional step)
MIN_ITEMS_FOR_PROPERTY = 5_000   # keep properties appearing on >= this many items
TOP_K_PROPERTIES = None          # or set e.g., 200 (takes precedence if not None)

# =========================
# Helpers
# =========================

# Safe saver: prefer fastparquet; fallback to CSV (never crash)
def save_dataframe(df: pd.DataFrame, parquet_path: Path, csv_path: Path):
    parquet_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        import fastparquet  # noqa: F401
        df.to_parquet(parquet_path, index=False, engine="fastparquet")
        print(f" Saved Parquet: {parquet_path}")
    except ImportError:
        df.to_csv(csv_path, index=False)
        print(f" fastparquet not installed. Saved CSV instead: {csv_path}")
    except Exception as e:
        print(f" Parquet save failed: {e}")
        df.to_csv(csv_path, index=False)
        print(f" Fallback CSV saved: {csv_path}")

# Value normalizer:
# - keeps booleans (true/false -> 1/0)
# - strips leading 'n' then extracts first number (e.g., "n720.000 424566" -> 720.0)
# - returns original string if non-numeric
_num_pat = re.compile(r"[-+]?\d*\.?\d+")
def clean_value(v):
    if pd.isna(v):
        return np.nan
    if isinstance(v, (int, float, np.integer, np.floating)):
        return float(v)
    s = str(v).strip().lower()
    if s in {"true", "yes"}:
        return 1.0
    if s in {"false", "no"}:
        return 0.0
    # strip a single leading 'n' (common pattern) before extracting number
    if s.startswith("n"):
        s = s[1:]
    m = _num_pat.search(s)
    return float(m.group()) if m else v  # keep original string if not numeric

def latest_ts_map(csv_path: Path, chunksize=CHUNKSIZE):
    """
    First pass: build a dict mapping (itemid, property) -> latest timestamp.
    Uses groupby per chunk for efficiency.
    """
    latest = {}  # key: (itemid, property) -> int timestamp
    usecols = ["timestamp", "itemid", "property", "value"]
    for i, chunk in enumerate(pd.read_csv(csv_path, usecols=usecols, chunksize=chunksize, low_memory=False)):
        # keep only necessary rows
        chunk = chunk.dropna(subset=["itemid", "property", "timestamp"])
        # type enforcement
        # (avoid astype on entire columns if it triggers memory; rely on numpy casting while grouping)
        grp = chunk.groupby(["itemid", "property"], as_index=False)["timestamp"].max()
        # update dict
        for r in grp.itertuples(index=False):
            key = (int(r.itemid), str(r.property))
            ts  = int(r.timestamp)
            if key not in latest or ts > latest[key]:
                latest[key] = ts
        if (i + 1) % PRINT_EVERY == 0:
            print(f"[{csv_path.name}] latest_ts_map: processed chunk {i+1}")
    return latest

def collect_latest_rows(csv_path: Path, latest_map: dict, chunksize=CHUNKSIZE):
    """
    Second pass: for rows that match the latest timestamp per (item, property),
    collect them, clean 'value', and return a concatenated dataframe (long format).
    """
    usecols = ["timestamp", "itemid", "property", "value"]
    out_parts = []
    for i, chunk in enumerate(pd.read_csv(csv_path, usecols=usecols, chunksize=chunksize, low_memory=False)):
        chunk = chunk.dropna(subset=["itemid", "property", "timestamp"]).copy()
        # normalize types for comparison
        chunk["itemid"]    = chunk["itemid"].astype(np.int64, copy=False)
        chunk["property"]  = chunk["property"].astype(str)
        chunk["timestamp"] = chunk["timestamp"].astype(np.int64, copy=False)

        # Build boolean mask for rows that are the latest per (item, property)
        # Using vectorized approach via DataFrame apply would be slow; do a python-level check
        idx = []
        it_item = chunk["itemid"].values
        it_prop = chunk["property"].values
        it_ts   = chunk["timestamp"].values
        for j in range(len(chunk)):
            key = (int(it_item[j]), it_prop[j])
            if key in latest_map and latest_map[key] == int(it_ts[j]):
                idx.append(j)

        if idx:
            matched = chunk.iloc[idx, :].copy()
            # clean value
            matched.loc[:, "value"] = matched["value"].apply(clean_value)
            out_parts.append(matched)

        if (i + 1) % PRINT_EVERY == 0:
            print(f"[{csv_path.name}] collect_latest_rows: matched rows in chunk {i+1}: {len(idx)}")

    if out_parts:
        res = pd.concat(out_parts, ignore_index=True)
    else:
        res = pd.DataFrame(columns=["itemid", "property", "timestamp", "value"])
    return res

# =========================
# Run Cleaning (two passes)
# =========================

print("== Pass 1: compute latest timestamp per (itemid, property) across both files ==")
latest1 = latest_ts_map(RAW_IP1)
latest2 = latest_ts_map(RAW_IP2)

# merge the two dicts, keeping max ts per key
latest = latest1
for k, ts in latest2.items():
    if k not in latest or ts > latest[k]:
        latest[k] = ts

print(f"Total unique (itemid, property) pairs: {len(latest):,}")

print("\n== Pass 2: collect rows with those latest timestamps and clean 'value' ==")
latest_rows_1 = collect_latest_rows(RAW_IP1, latest)
latest_rows_2 = collect_latest_rows(RAW_IP2, latest)

latest_rows = pd.concat([latest_rows_1, latest_rows_2], ignore_index=True)

# Deduplicate in case both files had the same (item, property, timestamp)
latest_rows.sort_values(["itemid", "property", "timestamp"], ascending=[True, True, False], inplace=True)
latest_rows = latest_rows.drop_duplicates(subset=["itemid", "property"], keep="first").reset_index(drop=True)

# (Optional) add a datetime column for readability (kept numeric ts for joins/perf)
latest_rows["timestamp_dt"] = pd.to_datetime(latest_rows["timestamp"], unit="ms", errors="coerce")

print("\n== Preview: Cleaned Item Properties (long / latest only) ==")
print(latest_rows.info())
print(latest_rows.head())
print("Top properties by item coverage:")
print(latest_rows.groupby("property")["itemid"].nunique().sort_values(ascending=False).head(10))

# Save (safe)
save_dataframe(latest_rows, OUT_IP_LATEST_PARQUET, OUT_IP_LATEST_CSV)

# =========================
# (Optional) Reduced Wide Pivot
# =========================
DO_PIVOT = True  # set False if you want to skip

if DO_PIVOT:
    print("\n== Optional pivot to reduced wide table ==")
    prop_cov = latest_rows.groupby("property")["itemid"].nunique().sort_values(ascending=False)

    if TOP_K_PROPERTIES:
        props_keep = prop_cov.head(TOP_K_PROPERTIES).index.tolist()
        print(f"Selected top {len(props_keep)} properties by coverage.")
    else:
        props_keep = prop_cov[prop_cov >= MIN_ITEMS_FOR_PROPERTY].index.tolist()
        print(f"Selected {len(props_keep)} properties with coverage >= {MIN_ITEMS_FOR_PROPERTY} items.")

    reduced = latest_rows[latest_rows["property"].isin(props_keep)].copy()

    # Pivot to wide: columns = property, index = itemid
    wide = reduced.pivot(index="itemid", columns="property", values="value")

    # Best-effort numeric conversion: keep numeric if it makes sense
    for c in wide.columns:
        if wide[c].dtype == object:
            converted = pd.to_numeric(wide[c], errors="coerce")
            # keep numeric if at least half of non-nulls converted
            if converted.notna().sum() >= 0.5 * wide[c].notna().sum():
                wide[c] = converted

    print("== Preview: Reduced Wide Item Features ==")
    print(wide.info())
    print(wide.head())

    # Save (safe) — reset_index to keep itemid as a column in CSV
    save_dataframe(wide.reset_index(), OUT_IP_WIDE_PARQUET, OUT_IP_WIDE_CSV)

print("\n     Item properties cleaning complete.")


== Pass 1: compute latest timestamp per (itemid, property) across both files ==
[item_properties_part1.1.csv] latest_ts_map: processed chunk 1
[item_properties_part1.1.csv] latest_ts_map: processed chunk 2
[item_properties_part1.1.csv] latest_ts_map: processed chunk 3
[item_properties_part1.1.csv] latest_ts_map: processed chunk 4
[item_properties_part1.1.csv] latest_ts_map: processed chunk 5
[item_properties_part1.1.csv] latest_ts_map: processed chunk 6
[item_properties_part1.1.csv] latest_ts_map: processed chunk 7
[item_properties_part1.1.csv] latest_ts_map: processed chunk 8
[item_properties_part1.1.csv] latest_ts_map: processed chunk 9
[item_properties_part1.1.csv] latest_ts_map: processed chunk 10
[item_properties_part1.1.csv] latest_ts_map: processed chunk 11
[item_properties_part2.csv] latest_ts_map: processed chunk 1
[item_properties_part2.csv] latest_ts_map: processed chunk 2
[item_properties_part2.csv] latest_ts_map: processed chunk 3
[item_properties_part2.csv] latest_ts_map:

4. Merge all 3 dataset into 1

In [23]:
# ============================
# Final Merge Pipeline (Slim)
# ============================
from pathlib import Path
import pandas as pd
import numpy as np

# -----------------
# Paths & settings
# -----------------
DATA_DIR   = Path("../Data/Cleaned Dataset")
OUT_DIR    = DATA_DIR  # keep outputs alongside cleaned data
OUT_DIR.mkdir(parents=True, exist_ok=True)

EVENTS_CSV      = DATA_DIR / "cleaned_user_events.csv"
CAT_TREE_CSV    = DATA_DIR / "cleaned_category_tree.csv"
IP_LATEST_CSV   = DATA_DIR / "cleaned_item_properties_latest.csv"   # long format (12M rows)

# Keep the item properties we really need for the first pass

PROPS_TO_KEEP = {"categoryid", "available"}

# Output files
ITEM_FEATURES_PARQ  = OUT_DIR / "item_features.parquet"
ITEM_FEATURES_CSV   = OUT_DIR / "item_features.csv"
FINAL_MERGED_PARQ   = OUT_DIR / "final_merged_events.parquet"
FINAL_MERGED_CSV    = OUT_DIR / "final_merged_events.csv"

# Chunk sizes
IP_CHUNKSIZE     = 1_000_000   # reading item_properties_latest.csv
EVENTS_CHUNKSIZE = 500_000     # chunk-merge events with item features

# -----------------
# Helpers
# -----------------
def safe_save(df, parquet_path: Path, csv_path: Path):
    """Try Parquet with fastparquet, else fallback to CSV (never crash)."""
    parquet_path.parent.mkdir(parents=True, exist_ok=True)
    try:
        import fastparquet  # noqa: F401
        df.to_parquet(parquet_path, index=False, engine="fastparquet")
        print(f" Saved Parquet: {parquet_path}")
    except ImportError:
        df.to_csv(csv_path, index=False)
        print(f" fastparquet not installed. Saved CSV instead: {csv_path}")
    except Exception as e:
        print(f" Parquet save failed: {e}")
        df.to_csv(csv_path, index=False)
        print(f" Fallback CSV saved: {csv_path}")

def coerce_int(series):
    """Coerce to pandas nullable Int64 safely."""
    return pd.to_numeric(series, errors="coerce").astype("Int64")

def coerce_binary(series):
    """Coerce 'available' (mixed) to 0/1 Int8 where possible."""
    s = pd.to_numeric(series, errors="coerce")
    # clamp to {0,1} if values are near binary
    s = s.fillna(0)
    s = (s > 0).astype("Int8")
    return s

# ---------------------------------------------------
# 1) Build a slim item-features table from long props
# ---------------------------------------------------
print("Step 1: Building item features (categoryid, available) from long properties...")
item_feat_parts = []

usecols = ["itemid", "property", "value"]  # timestamp is already 'latest' in this file
reader = pd.read_csv(
    IP_LATEST_CSV,
    usecols=usecols,
    dtype={"value": "string"},   # avoid DtypeWarning for mixed types
    chunksize=IP_CHUNKSIZE,
    low_memory=False
)

for i, chunk in enumerate(reader, start=1):
    # Keep only the properties we care about
    chunk = chunk[chunk["property"].isin(PROPS_TO_KEEP)].copy()
    if chunk.empty:
        print(f"  - Chunk {i}: no target properties, skipping.")
        continue

    # Long -> Wide per chunk
    # Because this is "latest" file, each (itemid, property) should be unique.
    wide = chunk.pivot_table(
        index="itemid",
        columns="property",
        values="value",
        aggfunc="last"  # safeguard
    )

    # Type cleanup
    if "categoryid" in wide.columns:
        wide["categoryid"] = coerce_int(wide["categoryid"])
    if "available" in wide.columns:
        wide["available"] = coerce_binary(wide["available"])

    # Reset index so itemid becomes a column
    wide = wide.reset_index()
    item_feat_parts.append(wide)

    print(f"  - Processed chunk {i}: {len(wide):,} item rows")

# Concatenate all parts and collapse duplicates (last wins)
if item_feat_parts:
    item_features = pd.concat(item_feat_parts, ignore_index=True)
    item_features.sort_values("itemid", inplace=True)
    item_features = item_features.drop_duplicates(subset="itemid", keep="last").reset_index(drop=True)
else:
    # Edge case: nothing found
    item_features = pd.DataFrame({"itemid": pd.Series(dtype="Int64")})
    for p in PROPS_TO_KEEP:
        item_features[p] = pd.Series(dtype="float")

print("\nItem features built:")
print(item_features.info())
print(item_features.head())

# Save slim item features now (so we can reuse later quickly)
safe_save(item_features, ITEM_FEATURES_PARQ, ITEM_FEATURES_CSV)

# ------------------------------------------
# 2) Join category hierarchy onto features
# ------------------------------------------
print("\nStep 2: Joining category tree onto item features...")
cat_tree = pd.read_csv(CAT_TREE_CSV)
# ensure ints
cat_tree["categoryid"] = coerce_int(cat_tree["categoryid"])
cat_tree["parentid"]   = coerce_int(cat_tree["parentid"])

item_features = item_features.merge(cat_tree, on="categoryid", how="left")

print("Item features + category tree:")
print(item_features.info())
print(item_features.head())

# (Optionally) re-save enriched item features
safe_save(item_features, ITEM_FEATURES_PARQ, ITEM_FEATURES_CSV)

# -------------------------------------------------------
# 3) Chunk-merge Events with item features (low memory)
# -------------------------------------------------------
print("\nStep 3: Chunk-merging events with item features...")

# prepare output file (CSV fall-back path)
final_out_csv  = FINAL_MERGED_CSV
final_out_parq = FINAL_MERGED_PARQ

# If a previous CSV exists, remove to avoid appending to old file
if final_out_csv.exists():
    final_out_csv.unlink()

# We’ll write CSV incrementally to be safe across environments.
reader_events = pd.read_csv(
    EVENTS_CSV,
    parse_dates=["event_time"],
    chunksize=EVENTS_CHUNKSIZE,
    low_memory=False
)

first = True
total_rows = 0
for j, echunk in enumerate(reader_events, start=1):
    merged = echunk.merge(item_features, on="itemid", how="left")
    total_rows += len(merged)

    # incremental CSV write
    merged.to_csv(final_out_csv, mode="a", index=False, header=first)
    first = False

    print(f"  - Merged event chunk {j}: {len(merged):,} rows (cumulative {total_rows:,})")

print(f"\n Final merged (CSV) written to: {final_out_csv}")

# Try to also write a Parquet (best-effort)
try:
    import fastparquet  # noqa: F401
    # If file is large, re-read the CSV in chunks and write a single parquet is non-trivial.
    # As a simple path, leave parquet out or convert later with an external tool.
    print("Tip: Convert large CSV to Parquet later if needed (e.g., using pandas with fastparquet).")
except Exception:
    pass

print("\n Merge pipeline complete.")


Step 1: Building item features (categoryid, available) from long properties...
  - Processed chunk 1: 34,706 item rows
  - Processed chunk 2: 34,760 item rows
  - Processed chunk 3: 34,741 item rows
  - Processed chunk 4: 34,748 item rows
  - Processed chunk 5: 34,767 item rows
  - Processed chunk 6: 34,779 item rows
  - Processed chunk 7: 34,758 item rows
  - Processed chunk 8: 34,770 item rows
  - Processed chunk 9: 34,738 item rows
  - Processed chunk 10: 34,724 item rows
  - Processed chunk 11: 34,724 item rows
  - Processed chunk 12: 34,710 item rows
  - Processed chunk 13: 128 item rows

Item features built:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 417053 entries, 0 to 417052
Data columns (total 3 columns):
 #   Column      Non-Null Count   Dtype
---  ------      --------------   -----
 0   itemid      417053 non-null  int64
 1   available   417053 non-null  Int8 
 2   categoryid  417053 non-null  Int64
dtypes: Int64(1), Int8(1), int64(1)
memory usage: 7.6 MB
None
proper