# Data Cleaning Pipeline: T-ECD Dataset

## Objective
This notebook implements a reproducible data cleaning pipeline based on the findings from the Exploratory Data Analysis (`analysis.ipynb`).

## Identified Issues & Cleaning Actions
1.  **Schema Errors**: Brands, Marketplace, Reviews files fail to load due to `embedding` corruption. **Action**: Attempt load; fallback to exclude `embedding` if failed.
2.  **Missing Values**:
    *   `Users`: `socdem_cluster`, `region`. **Action**: Impute with `-1`.
    *   `Retail Items`: `category`, `subcategory`. **Action**: Impute with "Unknown". `price`. **Action**: Drop rows.
    *   `Payments`: `brand_id`. **Action**: Impute with `-1`.
3.  **Duplicates**: `Brands`. **Action**: Deduplicate.
4.  **Data Types**: `price` (negative values). **Action**: Retain.
5.  

### Data Semantics: Embeddings
The `embedding` columns contain **300-dimensional vector representations** of items and brands. 
*   **Purpose**: These are critical for downstream AI tasks such as **recommendation systems** (calculating item-item similarity) and **customer segmentation** (clustering).
*   **Status**: 
    *   **Retail Items**: Embeddings are valid and **must be preserved**.
    *   **Brands / Marketplace / Reviews**: The source Parquet files contain schema inconsistencies (e.g., empty lists mixed with 300-float lists), causing standard `pyarrow` loaders to fail.
*   **Strategy**: We attempt to load the full datasets. If (and only if) the file is corrupted and loading fails, we fallback to excluding the `embedding` column for that specific file to salvage the remaining data (IDs, metadata). This ensures we don't discard the entire dataset due to one corrupted feature.

#### Why not fix the embeddings?
Replacing corrupted embeddings with dummy values is not feasible using standard `pyarrow` loaders because schema resolution fails before the column can be read into memory. While low-level or non-standard recovery approaches exist, they are operationally complex and disproportionate to the value gained. Therefore, excluding the embedding column for affected files is the most robust and cost-effective strategy.


In [1]:
import pandas as pd
import numpy as np
import os
from huggingface_hub import hf_hub_download, list_repo_files
import warnings
from collections import defaultdict

warnings.filterwarnings('ignore')

# --- CONFIGURATION ---
REPO_ID = "t-tech/T-ECD"
REPO_TYPE = "dataset"
CACHE_DIR = "dataset_cache"
DATASET_PATH_SMALL = "dataset/small"
DATASET_PATH_FULL = "dataset/full"

# GLOBAL CONSTANT: How many partitions to load for split datasets (e.g., events)
# Set this to a higher number (e.g., 50 or 100) to analyze more data.
# Set to None to load ALL available partitions (Warning: May run out of RAM in Colab)
DATASET_SMALL_NUM_PARTITIONS_TO_LOAD = 5 
DATASET_FULL_NUM_PARTITIONS_TO_LOAD = 1

# Ensure cache directory exists
os.makedirs(CACHE_DIR, exist_ok=True)

In [2]:
all_files = list_repo_files(repo_id=REPO_ID, repo_type=REPO_TYPE)

# Categorize files by domain and type for easy access
dataset_files = defaultdict(list)

for f in all_files:
    if f.endswith(".pq"):
        # Example f: dataset/small/retail/events/01082.pq
        # Key: dataset/small/retail/events
        dirname = os.path.dirname(f).replace("\\", "/") # Normalize path separators
        dataset_files[dirname].append(f)

print("File Index Created. Available Directories:")
for d in sorted(dataset_files.keys()):
    count = len(dataset_files[d])
    print(f" - {d} ({count} files)")

File Index Created. Available Directories:
 - dataset/full (2 files)
 - dataset/full/marketplace (1 files)
 - dataset/full/marketplace/events (793 files)
 - dataset/full/offers (1 files)
 - dataset/full/offers/events (944 files)
 - dataset/full/payments/events (1309 files)
 - dataset/full/payments/receipts (1017 files)
 - dataset/full/retail (1 files)
 - dataset/full/retail/events (579 files)
 - dataset/full/reviews (1309 files)
 - dataset/small (2 files)
 - dataset/small/marketplace (1 files)
 - dataset/small/marketplace/events (227 files)
 - dataset/small/offers (1 files)
 - dataset/small/offers/events (227 files)
 - dataset/small/retail (1 files)
 - dataset/small/retail/events (227 files)
 - dataset/small/reviews (227 files)


## Helper Functions

In [3]:
def load_remote_parquet_safe(filename, columns_to_exclude=None):
    """
    Loads a parquet file from Hugging Face, handling potential schema errors.
    If columns_to_exclude is provided, those columns are excluded BEFORE loading.
    """
    print(f"Loading {filename}...")
    try:
        local_path = hf_hub_download(
            repo_id=REPO_ID,
            filename=filename,
            repo_type=REPO_TYPE,
            local_dir=CACHE_DIR,
            local_dir_use_symlinks=False
        )
        
        if columns_to_exclude:
            import pyarrow.parquet as pq
            # Read schema using read_schema (more reliable than ParquetFile)
            schema = pq.read_schema(local_path)
            # Filter out internal pyarrow columns (those starting with __)
            all_cols = [name for name in schema.names if not name.startswith('__')]
            use_cols = [c for c in all_cols if c not in columns_to_exclude]
            print(f"  Excluding columns: {columns_to_exclude}")
            print(f"  Reading columns: {use_cols}")
            df = pd.read_parquet(local_path, columns=use_cols)
            return df
        
        # Otherwise, try loading all columns
        try:
            df = pd.read_parquet(local_path)
            return df
        except Exception as e:
            print(f"  Standard load failed: {e}")
            import pyarrow.parquet as pq
            schema = pq.read_schema(local_path)
            all_cols = [name for name in schema.names if not name.startswith('__')]
            if 'embedding' in all_cols:
                print(f"  Retrying without 'embedding' column...")
                use_cols = [c for c in all_cols if c != 'embedding']
                df = pd.read_parquet(local_path, columns=use_cols)
                print("  Success (with exclusions).")
                return df
            else:
                raise e
    except Exception as e:
        print(f"Error downloading/loading {filename}: {e}")
        return None

In [4]:
def validate_cleaning(df_before, df_after, name):
    print(f"\n--- Validation: {name} ---")
    print(f"Rows: {len(df_before)} -> {len(df_after)} (Dropped: {len(df_before) - len(df_after)})")
    print(f"Missing Values Before: {df_before.isnull().sum().sum()}")
    print(f"Missing Values After:  {df_after.isnull().sum().sum()}")
    
    # Exclude unhashable columns (like embedding) when checking duplicates
    hashable_cols_before = [c for c in df_before.columns if df_before[c].dtype != 'object' or not df_before[c].apply(lambda x: isinstance(x, (list, np.ndarray))).any()]
    hashable_cols_after = [c for c in df_after.columns if df_after[c].dtype != 'object' or not df_after[c].apply(lambda x: isinstance(x, (list, np.ndarray))).any()]
    
    try:
        print(f"Duplicate Rows Before: {df_before[hashable_cols_before].duplicated().sum()}")
        print(f"Duplicate Rows After:  {df_after[hashable_cols_after].duplicated().sum()}")
    except:
        print("Duplicate Rows: (skipped - unhashable columns present)")

In [5]:
def load_dataframe_from_partitions_safe(file_list, limit=DATASET_SMALL_NUM_PARTITIONS_TO_LOAD, columns_to_exclude=None):
    """
    Loads multiple parquet partition files and concatenates them.
    Supports columns_to_exclude to skip corrupted columns like 'embedding'.
    """
    if not file_list:
        print("No files to load.")
        return None
    
    files_to_load = file_list[:limit] if limit else file_list
    print(f"Loading {len(files_to_load)} partitions (out of {len(file_list)} available)...")
    
    dfs = []
    for f in files_to_load:
        df = load_remote_parquet_safe(f, columns_to_exclude=columns_to_exclude)
        if df is not None:
            dfs.append(df)
    
    if not dfs:
        print("No dataframes loaded successfully.")
        return None
    
    print("Concatenating partitions...")
    full_df = pd.concat(dfs, ignore_index=True)
    return full_df

## 1. Users Data Cleaning
**Issue**: `socdem_cluster` and `region` contain missing values.
**Strategy**: Impute with `-1` to represent "Unknown". Dropping these users might lose valuable event data linked to them.

In [6]:
users_path = f"{DATASET_PATH_SMALL}/users.pq"
df_users = load_remote_parquet_safe(users_path)

if df_users is not None:
    print("Before Cleaning:")
    print(df_users.isnull().sum())
    
    # Evidence
    print(f"Evidence - Missing socdem_cluster: {df_users['socdem_cluster'].isnull().sum()}")
    print(f"Evidence - Missing region: {df_users['region'].isnull().sum()}")
    
    # Cleaning
    df_users_clean = df_users.copy()
    df_users_clean['socdem_cluster'] = df_users_clean['socdem_cluster'].fillna(-1)
    df_users_clean['region'] = df_users_clean['region'].fillna(-1)
    
    validate_cleaning(df_users, df_users_clean, "Users")
    
    # Verify
    assert df_users_clean.isnull().sum().sum() == 0, "Users table still has missing values!"

Loading dataset/small/users.pq...
Before Cleaning:
user_id               0
socdem_cluster     5153
region            58917
dtype: int64
Evidence - Missing socdem_cluster: 5153
Evidence - Missing region: 58917

--- Validation: Users ---
Rows: 3500000 -> 3500000 (Dropped: 0)
Missing Values Before: 64070
Missing Values After:  0
Duplicate Rows Before: 0
Duplicate Rows After:  0


## 2. Brands Data Cleaning
**Issue 1**: Schema error when loading `embedding` column.
**Strategy**: Load without `embedding`.
**Issue 2**: Duplicate `brand_id` entries.
**Strategy**: Drop duplicates to ensure `brand_id` is a unique primary key.

In [7]:
brands_path = f"{DATASET_PATH_SMALL}/brands.pq"
# Explicitly handle the known schema error by excluding embedding if needed
df_brands = load_remote_parquet_safe(brands_path, columns_to_exclude=['embedding'])

if df_brands is not None:
    print(f"Initial Rows: {len(df_brands)}")
    
    # Check Duplicates
    dupes = df_brands.duplicated(subset=['brand_id']).sum()
    print(f"Duplicates found: {dupes}")
    
    # Evidence
    print(f"Evidence - Duplicate brand_ids: {dupes}")
    
    # Cleaning
    df_brands_clean = df_brands.drop_duplicates(subset=['brand_id']).copy()
    
    validate_cleaning(df_brands, df_brands_clean, "Brands")

Loading dataset/small/brands.pq...
  Excluding columns: ['embedding']
  Reading columns: ['brand_id']
Initial Rows: 24513
Duplicates found: 46
Evidence - Duplicate brand_ids: 46

--- Validation: Brands ---
Rows: 24513 -> 24467 (Dropped: 46)
Missing Values Before: 0
Missing Values After:  0
Duplicate Rows Before: 46
Duplicate Rows After:  0


## 3.1 Retail Items Cleaning
**Issue**: Missing `category`, `subcategory`, and `price`.
**Strategy**:
*   `category`/`subcategory`: Impute with "Unknown".
*   `price`: Drop rows. Price is a critical feature for most analyses; imputing it (e.g., with mean) could introduce significant bias given the wide range of products.

In [8]:
retail_items_path = f"{DATASET_PATH_SMALL}/retail/items.pq"
df_retail_items = load_remote_parquet_safe(retail_items_path)

if df_retail_items is not None:
    print("Before Cleaning:")
    print(df_retail_items.isnull().sum())
    
    # Evidence
    print(f"Evidence - Missing category: {df_retail_items['category'].isnull().sum()}")
    print(f"Evidence - Missing subcategory: {df_retail_items['subcategory'].isnull().sum()}")
    print(f"Evidence - Missing price: {df_retail_items['price'].isnull().sum()}")
    
    df_retail_clean = df_retail_items.copy()
    
    # Impute Categorical
    df_retail_clean['category'] = df_retail_clean['category'].fillna("Unknown")
    df_retail_clean['subcategory'] = df_retail_clean['subcategory'].fillna("Unknown")
    
    # Drop Missing Price
    df_retail_clean = df_retail_clean.dropna(subset=['price'])
    
    validate_cleaning(df_retail_items, df_retail_clean, "Retail Items")

Loading dataset/small/retail/items.pq...
Before Cleaning:
item_id            0
brand_id           0
category        9585
subcategory     9585
price          26489
embedding          0
dtype: int64
Evidence - Missing category: 9585
Evidence - Missing subcategory: 9585
Evidence - Missing price: 26489

--- Validation: Retail Items ---
Rows: 250171 -> 223682 (Dropped: 26489)
Missing Values Before: 45659
Missing Values After:  0
Duplicate Rows Before: 0
Duplicate Rows After:  0


## 3.2 Retail Events (Validation Only)
**Issue**: Analysis shows 0 missing values.
**Strategy**: No cleaning needed. Validate data quality.

In [15]:
# --- RETAIL EVENTS ---
retail_events_dir = f"{DATASET_PATH_SMALL}/retail/events"
retail_events_files = dataset_files.get(retail_events_dir, [])

df_retail_events = load_dataframe_from_partitions_safe(retail_events_files, limit=5)

if df_retail_events is not None:
    print("Validation Check:")
    print(df_retail_events.isnull().sum())
    print(f"\nTotal Missing Values: {df_retail_events.isnull().sum().sum()}")
    print(f"Shape: {df_retail_events.shape}")

Loading 5 partitions (out of 227 available)...
Loading dataset/small/retail/events/01082.pq...
Loading dataset/small/retail/events/01083.pq...
Loading dataset/small/retail/events/01084.pq...
Loading dataset/small/retail/events/01085.pq...
Loading dataset/small/retail/events/01086.pq...
Concatenating partitions...
Validation Check:
timestamp      0
user_id        0
item_id        0
subdomain      0
action_type    0
os             0
dtype: int64

Total Missing Values: 0
Shape: (1892095, 6)


## 4.1 Marketplace Items Cleaning
**Issue**: Schema error (embedding).
**Strategy**: Load without `embedding`.

In [9]:
mp_items_path = f"{DATASET_PATH_SMALL}/marketplace/items.pq"
df_mp_items = load_remote_parquet_safe(mp_items_path, columns_to_exclude=['embedding'])

if df_mp_items is not None:
    print(f"Successfully loaded Marketplace Items. Shape: {df_mp_items.shape}")

Loading dataset/small/marketplace/items.pq...
  Excluding columns: ['embedding']
  Reading columns: ['item_id', 'brand_id', 'category', 'subcategory', 'price']
Successfully loaded Marketplace Items. Shape: (2325409, 5)


## 4.2 Marketplace Events Cleaning
**Issue**: `subdomain` column has 1115 missing values.
**Strategy**: Impute with `"Unknown"` to preserve all events for behavioral analysis.

In [13]:
mp_events_dir = f"{DATASET_PATH_SMALL}/marketplace/events"
mp_events_files = dataset_files.get(mp_events_dir, [])

df_mp_events = load_dataframe_from_partitions_safe(mp_events_files, limit=DATASET_SMALL_NUM_PARTITIONS_TO_LOAD)

if df_mp_events is not None:
    print("Before Cleaning:")
    print(df_mp_events.isnull().sum())
    
    # Evidence
    print(f"\nEvidence - Missing subdomain: {df_mp_events['subdomain'].isnull().sum()}")
    
    df_mp_events_clean = df_mp_events.copy()
    df_mp_events_clean['subdomain'] = df_mp_events_clean['subdomain'].fillna("Unknown")
    
    validate_cleaning(df_mp_events, df_mp_events_clean, "Marketplace Events")

Loading 5 partitions (out of 227 available)...
Loading dataset/small/marketplace/events/01082.pq...
Loading dataset/small/marketplace/events/01083.pq...
Loading dataset/small/marketplace/events/01084.pq...
Loading dataset/small/marketplace/events/01085.pq...
Loading dataset/small/marketplace/events/01086.pq...
Concatenating partitions...
Before Cleaning:
timestamp         0
user_id           0
item_id           0
subdomain      1115
action_type       0
os                0
dtype: int64

Evidence - Missing subdomain: 1115

--- Validation: Marketplace Events ---
Rows: 2561575 -> 2561575 (Dropped: 0)
Missing Values Before: 1115
Missing Values After:  0
Duplicate Rows Before: 0
Duplicate Rows After:  0


## 5. Reviews Cleaning
**Issue**: Schema error (embedding) in partition files.
**Strategy**: Load a sample partition without `embedding` to demonstrate the fix.

In [10]:
# Get list of review partition files (similar to analysis.ipynb)
reviews_dir = f"{DATASET_PATH_SMALL}/reviews"
review_files = dataset_files.get(reviews_dir, [])  # Assumes you have the dataset_files dict from analysis.ipynb

# Load with embedding excluded
df_reviews = load_dataframe_from_partitions_safe(
    review_files, 
    limit=DATASET_SMALL_NUM_PARTITIONS_TO_LOAD,  # Or whatever limit you want
    columns_to_exclude=['embedding']
)

if df_reviews is not None:
    print(f"Successfully loaded Reviews. Shape: {df_reviews.shape}")
    print(df_reviews.head())

Loading 5 partitions (out of 227 available)...
Loading dataset/small/reviews/01082.pq...
  Excluding columns: ['embedding']
  Reading columns: ['timestamp', 'user_id', 'brand_id', 'rating']
Loading dataset/small/reviews/01083.pq...
  Excluding columns: ['embedding']
  Reading columns: ['timestamp', 'user_id', 'brand_id', 'rating']
Loading dataset/small/reviews/01084.pq...
  Excluding columns: ['embedding']
  Reading columns: ['timestamp', 'user_id', 'brand_id', 'rating']
Loading dataset/small/reviews/01085.pq...
  Excluding columns: ['embedding']
  Reading columns: ['timestamp', 'user_id', 'brand_id', 'rating']
Loading dataset/small/reviews/01086.pq...
  Excluding columns: ['embedding']
  Reading columns: ['timestamp', 'user_id', 'brand_id', 'rating']
Concatenating partitions...
Successfully loaded Reviews. Shape: (11545, 4)
                  timestamp   user_id  brand_id  rating
0 1082 days 00:01:05.711723  25741061    141226       5
1 1082 days 00:01:12.026501  71011848     65693    

## 6.1 Payments Data Cleaning
**Issue**: High missingness in `brand_id` (~90%) and some missing `price`.
**Strategy**:
*   `brand_id`: Impute with `-1`. The high missing rate suggests many items are non-branded or the data is simply not available. Dropping would lose 90% of data.
*   `price`: Drop rows with missing price.

In [11]:
# Get list of receipt partition files
receipts_dir = f"{DATASET_PATH_FULL}/payments/receipts"
receipt_files = dataset_files.get(receipts_dir, [])

# Load multiple partitions (no embedding issues here, so no columns_to_exclude needed)
df_receipts = load_dataframe_from_partitions_safe(
    receipt_files, 
    limit=DATASET_FULL_NUM_PARTITIONS_TO_LOAD,  # Adjust as needed (or set to None for all)
    columns_to_exclude=None
)

if df_receipts is not None:
    print("Before Cleaning:")
    print(df_receipts.isnull().sum())
    
    # Evidence
    print(f"\nEvidence - Missing brand_id: {df_receipts['brand_id'].isnull().sum()} ({df_receipts['brand_id'].isnull().mean()*100:.1f}%)")
    print(f"Evidence - Missing price: {df_receipts['price'].isnull().sum()}")
    
    df_receipts_clean = df_receipts.copy()
    
    # Impute Brand ID
    df_receipts_clean['brand_id'] = df_receipts_clean['brand_id'].fillna(-1)
    
    # Drop Missing Price
    df_receipts_clean = df_receipts_clean.dropna(subset=['price'])
    
    validate_cleaning(df_receipts, df_receipts_clean, "Payments Receipts")

Loading 1 partitions (out of 1017 available)...
Loading dataset/full/payments/receipts/00292.pq...
Concatenating partitions...
Before Cleaning:
timestamp                   0
user_id                     0
brand_id               999928
approximate_item_id         0
count                       0
price                   14892
transaction_hash            0
dtype: int64

Evidence - Missing brand_id: 999928 (90.0%)
Evidence - Missing price: 14892

--- Validation: Payments Receipts ---
Rows: 1110501 -> 1095609 (Dropped: 14892)
Missing Values Before: 1014820
Missing Values After:  0
Duplicate Rows Before: 0
Duplicate Rows After:  0


## 6.2 Payments Events (Validation Only)
**Issue**: Not analyzed in detail in EDA. Need to verify data quality.
**Strategy**: Load sample and check for missing values.

In [None]:
# --- PAYMENTS EVENTS ---
pay_events_dir = f"{DATASET_PATH_FULL}/payments/events"
pay_events_files = dataset_files.get(pay_events_dir, [])

df_pay_events = load_dataframe_from_partitions_safe(pay_events_files, limit=1)

if df_pay_events is not None:
    print("Validation Check:")
    print(df_pay_events.isnull().sum())
    print(f"\nTotal Missing Values: {df_pay_events.isnull().sum().sum()}")
    print(f"Shape: {df_pay_events.shape}")

## 7.1 Offers Items Cleaning
**Issue**: `brand_id` column has 542 missing values (2.4%).
**Strategy**: Impute with `-1` to represent unbranded/unknown brand, preserving all offer items.

In [None]:
# --- OFFERS ITEMS ---
offers_items_path = f"{DATASET_PATH_SMALL}/offers/items.pq"
df_offers_items = load_remote_parquet_safe(offers_items_path)

if df_offers_items is not None:
    print("Before Cleaning:")
    print(df_offers_items.isnull().sum())
    
    # Evidence
    print(f"\nEvidence - Missing brand_id: {df_offers_items['brand_id'].isnull().sum()} ({df_offers_items['brand_id'].isnull().mean()*100:.1f}%)")
    
    df_offers_items_clean = df_offers_items.copy()
    df_offers_items_clean['brand_id'] = df_offers_items_clean['brand_id'].fillna(-1)
    
    validate_cleaning(df_offers_items, df_offers_items_clean, "Offers Items")

## 7.2 Offers Events (Validation Only)
**Issue**: Analysis shows 0 missing values.
**Strategy**: No cleaning needed. Validate data quality.

In [14]:
# --- OFFERS EVENTS ---
offers_events_dir = f"{DATASET_PATH_SMALL}/offers/events"
offers_events_files = dataset_files.get(offers_events_dir, [])

df_offers_events = load_dataframe_from_partitions_safe(offers_events_files, limit=5)

if df_offers_events is not None:
    print("Validation Check:")
    print(df_offers_events.isnull().sum())
    print(f"\nTotal Missing Values: {df_offers_events.isnull().sum().sum()}")
    print(f"Shape: {df_offers_events.shape}")

Loading 5 partitions (out of 227 available)...
Loading dataset/small/offers/events/01082.pq...
Loading dataset/small/offers/events/01083.pq...
Loading dataset/small/offers/events/01084.pq...
Loading dataset/small/offers/events/01085.pq...
Loading dataset/small/offers/events/01086.pq...
Concatenating partitions...
Validation Check:
timestamp      0
user_id        0
item_id        0
action_type    0
dtype: int64

Total Missing Values: 0
Shape: (14814526, 4)


## Conclusion
We have addressed the critical data quality issues identified in the EDA:
1.  Recovered access to tables with broken schemas (Brands, Marketplace, Reviews).
2.  Standardized handling of missing values across Users and Items.
3.  Ensured data integrity by removing duplicates and invalid entries.

The datasets are now ready for downstream feature engineering and modeling.