# KRAFT: Data Preparation and Feature Engineering

This notebook covers the initial stage of building our recommender system. The primary goals are:
1. Load the raw KuaiRec dataset files.
2. Perform initial cleaning, type conversions, and handle missing values.
3. Engineer new features from the existing data to enrich our interaction logs.
4. Prepare and save processed datasets for:
    - Training an ALS model for candidate generation (from `big_matrix`).
    - Training a LightGBM model for ranking (from `big_matrix`).
    - Evaluating the ranker on a dense subset (`small_matrix`).

## 1. Imports and Configuration

First, we import necessary libraries and define global configurations such as file paths, column selections, and data types for initial loading and final processing.

In [1]:
import pandas as pd
import numpy as np
import gc
from datetime import datetime
from scipy.sparse import csr_matrix
import json
import os

RAW_DATA_BASE_PATH = "../raw_data/KuaiRec/data/"
PROCESSED_DATA_PATH = "../data/"
MODELS_PATH = "../models/"
os.makedirs(PROCESSED_DATA_PATH, exist_ok=True)
os.makedirs(MODELS_PATH, exist_ok=True)

# --- Dtype Definitions & Column Selections ---

# For interaction matrices (big_matrix.csv, small_matrix.csv)
interaction_cols_initial_load = {
    'user_id': 'float32', 'video_id': 'float32', 'play_duration': 'float32',
    'video_duration': 'float32', 'time': 'str', 'date': 'float32',
    'timestamp': 'float32', 'watch_ratio': 'float32'
}
interaction_cols_final_dtypes = {
    'user_id': 'int32', 'video_id': 'int32', 'play_duration': 'int32',
    'video_duration': 'int32', 'time': 'str', 'date': 'int32',
    'timestamp': 'float32', 'watch_ratio': 'float32'
}

# For user_features.csv
user_features_selected_cols = {
    'user_id': 'int32', 'user_active_degree': 'category',
    'is_lowactive_period': 'float32', 
    'is_live_streamer': 'float32', 
    'is_video_author': 'float32',
    'follow_user_num': 'int32', 
    'fans_user_num': 'int32', 
    'register_days': 'int32',
}
onehot_feature_names = []
for i in range(18):
    col_name = f'onehot_feat{i}'
    user_features_selected_cols[col_name] = 'float32' # Load as float to handle potential NaNs
    onehot_feature_names.append(col_name)

# For item_categories.csv
item_categories_selected_cols = {'video_id': 'int32', 'feat': 'str'}

# For item_daily_features.csv - initial load includes counts for ratio calculation
item_daily_initial_load_cols = {
    'video_id': 'float32', 
    'date': 'float32', # Will be renamed to item_stats_date
    'author_id': 'float32',
    'video_type': 'category', 
    'upload_dt': 'str', 
    'video_duration': 'float32', # Will be renamed to video_duration_daily
    'show_cnt': 'float32', # For ratio
    'play_cnt': 'float32', # For ratio
    'like_cnt': 'float32', # For ratio
    'complete_play_cnt': 'float32', # For ratio
    'play_progress': 'float32', # Loaded as 'play_progress', will be 'daily_play_progress'
    'video_tag_id': 'float32', # Daily item tag
}
# Final columns to keep from daily features for merging (after processing and ratio creation)
item_daily_cols_to_keep_for_merge = [
    'video_id', 'item_stats_date', 'author_id', 'video_type', 'upload_dt_parsed',
    'video_duration_daily', 'daily_play_progress',
    'daily_play_per_show_ratio', 'daily_like_per_play_ratio', 'daily_completion_rate',
    'video_tag_id'
]

## 2. Load and Pre-process Raw Data Files

This section loads the primary data files (`big_matrix`, `small_matrix`, `user_features`, `item_categories`, `item_daily_features`). 
Initial processing includes:
- Applying specified dtypes during loading.
- Handling potential NaNs in integer columns by loading as float first, then filling NaNs and casting to the target integer type.
- Parsing list-like strings (e.g., item tags).
- Renaming columns for clarity and consistency (e.g., in `item_daily_features`).

In [2]:
def post_process_interaction_df(df, final_dtypes_map):
    """Helper to fill NaNs and cast dtypes for interaction DataFrames."""
    int_cols_with_potential_na = ['user_id', 'video_id', 'play_duration', 'video_duration', 'date']
    for col in int_cols_with_potential_na:
        if col in df.columns:
            fill_value = -1 if ('id' in col or 'date' in col) else 0
            df[col] = df[col].fillna(fill_value).astype(final_dtypes_map[col])
    for col in ['timestamp', 'watch_ratio']:
        if col in df.columns: 
            df[col] = df[col].astype(final_dtypes_map[col])
    return df

print("Loading big_matrix.csv...")
df_big_interactions = pd.read_csv(os.path.join(RAW_DATA_BASE_PATH, "big_matrix.csv"),
                                  usecols=interaction_cols_initial_load.keys(),
                                  dtype=interaction_cols_initial_load)
df_big_interactions = post_process_interaction_df(df_big_interactions, interaction_cols_final_dtypes)
print(f"Loaded and processed big_matrix: {df_big_interactions.shape}")

print("\nLoading small_matrix.csv...")
df_small_interactions = pd.read_csv(os.path.join(RAW_DATA_BASE_PATH, "small_matrix.csv"),
                                    usecols=interaction_cols_initial_load.keys(),
                                    dtype=interaction_cols_initial_load)
df_small_interactions = post_process_interaction_df(df_small_interactions, interaction_cols_final_dtypes)
print(f"Loaded and processed small_matrix: {df_small_interactions.shape}")

print("\nLoading user_features.csv (selected columns)...")
df_user_features = pd.read_csv(os.path.join(RAW_DATA_BASE_PATH, "user_features.csv"),
                               usecols=user_features_selected_cols.keys(),
                               dtype=user_features_selected_cols)
for col in onehot_feature_names:
    df_user_features[col] = df_user_features[col].fillna(-1).astype('int16')
for col in ['is_lowactive_period', 'is_live_streamer', 'is_video_author']:
    if col in df_user_features.columns:
        if df_user_features[col].isnull().any(): 
            df_user_features[col] = df_user_features[col].fillna(-1).astype('int8') # -1 for missing flag
        else: 
            df_user_features[col] = df_user_features[col].astype('int8')
print(f"Loaded user_features: {df_user_features.shape}")

print("\nLoading item_categories.csv...")
df_item_categories = pd.read_csv(os.path.join(RAW_DATA_BASE_PATH, "item_categories.csv"),
                                 usecols=item_categories_selected_cols.keys(),
                                 dtype=item_categories_selected_cols)
def parse_feat_list(feat_str):
    if isinstance(feat_str, list): return len(feat_str)
    if pd.isna(feat_str) or not isinstance(feat_str, str): return 0
    try: return len(eval(feat_str))
    except: return 0 # Handles malformed strings
df_item_categories['num_item_tags'] = df_item_categories['feat'].apply(parse_feat_list).astype('int16')
df_item_categories = df_item_categories.drop(columns=['feat'])
print(f"Loaded item_categories: {df_item_categories.shape}")

print("\nLoading and pre-processing item_daily_features.csv...")
df_item_daily_full_load = pd.read_csv(os.path.join(RAW_DATA_BASE_PATH, "item_daily_features.csv"),
                                usecols=item_daily_initial_load_cols.keys(),
                                dtype=item_daily_initial_load_cols)
df_item_daily_processed = df_item_daily_full_load.rename(columns={
    'date': 'item_stats_date', 
    'video_duration': 'video_duration_daily',
    'show_cnt': 'daily_show_cnt', 
    'play_cnt': 'daily_play_cnt',
    'like_cnt': 'daily_like_cnt', 
    'complete_play_cnt': 'daily_complete_play_cnt',
    'play_progress': 'daily_play_progress'
})
del df_item_daily_full_load
gc.collect()

# NA fill and type conversion for the initially processed item_daily_features
id_cols_daily = ['video_id', 'author_id', 'video_tag_id']
count_cols_daily = ['daily_show_cnt', 'daily_play_cnt', 'daily_like_cnt', 'daily_complete_play_cnt']
for col in id_cols_daily:
    if col in df_item_daily_processed.columns:
        df_item_daily_processed[col] = df_item_daily_processed[col].fillna(-1).astype('int32')
if 'item_stats_date' in df_item_daily_processed.columns:
    df_item_daily_processed['item_stats_date'] = df_item_daily_processed['item_stats_date'].fillna(-1).astype('int32')
for col in count_cols_daily:
    if col in df_item_daily_processed.columns:
        df_item_daily_processed[col] = df_item_daily_processed[col].fillna(0).astype('int32')

float_cols_daily = ['video_duration_daily', 'daily_play_progress']
for col in float_cols_daily:
    if col in df_item_daily_processed.columns:
        df_item_daily_processed[col] = df_item_daily_processed[col].fillna(0).astype('float32')

print(f"Initial processing of item_daily_features complete. Shape: {df_item_daily_processed.shape}")
gc.collect()

Loading big_matrix.csv...
Loaded and processed big_matrix: (12530806, 8)

Loading small_matrix.csv...
Loaded and processed small_matrix: (4676570, 8)

Loading user_features.csv (selected columns)...
Loaded user_features: (7176, 26)

Loading item_categories.csv...
Loaded item_categories: (10728, 2)

Loading and pre-processing item_daily_features.csv...
Initial processing of item_daily_features complete. Shape: (343341, 12)


0

## 3. Global Processing of Daily Item Features

Here, we perform further transformations on the `item_daily_processed` DataFrame:
- Parse `upload_dt` to datetime objects (`upload_dt_parsed`).
- Calculate key ratios like play-per-show, like-per-play, and completion rate.
- Ensure `video_type` is categorical.
- Select only the essential columns for merging (`item_daily_cols_to_keep_for_merge`).
- **Crucially, identify and remove duplicate entries based on `(video_id, item_stats_date)`** to prevent row explosion during merges. This was a key finding from previous iterations.

In [None]:
print("\nGlobally processing df_item_daily for derived features & slimming...")
if 'upload_dt' in df_item_daily_processed.columns:
    df_item_daily_processed['upload_dt_parsed'] = pd.to_datetime(df_item_daily_processed['upload_dt'], errors='coerce')
else:
    # If 'upload_dt' wasn't loaded (e.g., due to column selection changes), create a placeholder
    df_item_daily_processed['upload_dt_parsed'] = pd.NaT 

# Calculate Ratios (with checks for column existence)
for col_pair, new_col_name in [
    (('daily_play_cnt', 'daily_show_cnt'), 'daily_play_per_show_ratio'),
    (('daily_like_cnt', 'daily_play_cnt'), 'daily_like_per_play_ratio'),
    (('daily_complete_play_cnt', 'daily_play_cnt'), 'daily_completion_rate')
]:
    num_col, den_col = col_pair
    if num_col in df_item_daily_processed.columns and den_col in df_item_daily_processed.columns:
        df_item_daily_processed[new_col_name] = (df_item_daily_processed[num_col] / (df_item_daily_processed[den_col] + 1e-6)).astype('float32')
    else:
        df_item_daily_processed[new_col_name] = np.float32(0.0) # Default if components are missing

if 'video_type' in df_item_daily_processed.columns and df_item_daily_processed['video_type'].dtype.name != 'category':
    df_item_daily_processed['video_type'] = df_item_daily_processed['video_type'].astype('category')
    # Ensure 'Unknown' category exists if we plan to fill NaNs with it later
    if 'Unknown' not in df_item_daily_processed['video_type'].cat.categories:
        df_item_daily_processed['video_type'] = df_item_daily_processed['video_type'].cat.add_categories('Unknown')

# Select final columns for the lean daily features table
actual_cols_to_keep = [col for col in item_daily_cols_to_keep_for_merge if col in df_item_daily_processed.columns]
df_item_daily_lean_for_merge = df_item_daily_processed[actual_cols_to_keep].copy()
print(f"Slimmed df_item_daily_lean_for_merge for merging. Shape: {df_item_daily_lean_for_merge.shape}, Columns: {df_item_daily_lean_for_merge.columns.tolist()}")

# Remove Duplicates from the lean daily features table
print("\nChecking for duplicate keys in df_item_daily_lean_for_merge...")
key_cols_daily = ['video_id', 'item_stats_date']
actual_key_cols_daily = [col for col in key_cols_daily if col in df_item_daily_lean_for_merge.columns]

if len(actual_key_cols_daily) == len(key_cols_daily):
    duplicate_daily_keys = df_item_daily_lean_for_merge.duplicated(subset=actual_key_cols_daily, keep=False)
    num_duplicate_daily_keys = duplicate_daily_keys.sum()
    print(f"Number of rows involved in duplicate ({', '.join(actual_key_cols_daily)}) keys: {num_duplicate_daily_keys}")
    if num_duplicate_daily_keys > 0:
        # print("Example duplicate key entries:") # Optional: view duplicates
        # print(df_item_daily_lean_for_merge[duplicate_daily_keys].sort_values(by=actual_key_cols_daily).head())
        print("Attempting to drop duplicate keys from df_item_daily_lean_for_merge, keeping first...")
        df_item_daily_lean_for_merge = df_item_daily_lean_for_merge.drop_duplicates(subset=actual_key_cols_daily, keep='first').copy()
        print(f"Shape of df_item_daily_lean_for_merge after dropping duplicates: {df_item_daily_lean_for_merge.shape}")
else:
    print(f"Warning: One or more key columns for daily features duplicate check ({key_cols_daily}) not found. Skipping duplicate check.")

del df_item_daily_processed # Free memory from the intermediate processed df
gc.collect()

## 4. Feature Engineering Function for Interactions

This function, `engineer_features_for_interactions`, consolidates all steps to transform a raw interaction DataFrame (either `big_matrix` or `small_matrix` based) into a feature-rich DataFrame ready for modeling. It performs:
- Extraction of temporal features (hour, day of week).
- Merging with user features, static item features (categories), and the processed (lean) daily item features.
- Post-merge feature creation (e.g., `video_age_days`).
- NA filling for columns introduced by merges.
- Sorting by timestamp.

In [None]:
def engineer_features_for_interactions(df_interactions_raw, df_user_features_ref, df_item_categories_ref, df_item_daily_ref_lean):
    print(f"Engineering features for interaction table with shape: {df_interactions_raw.shape}")
    df = df_interactions_raw.copy()

    # Temporal features from interaction time
    df['interaction_datetime'] = pd.to_datetime(df['time'], errors='coerce')
    df['interaction_hour'] = df['interaction_datetime'].dt.hour.fillna(-1).astype('int8')
    df['interaction_day_of_week'] = df['interaction_datetime'].dt.dayofweek.fillna(-1).astype('int8')
    df = df.rename(columns={'date': 'interaction_date', 'video_duration': 'video_duration_interaction'})

    # Merge with user features
    print("Merging user features...")
    df = pd.merge(df, df_user_features_ref, on='user_id', how='left')
    print(f"Shape after user features merge: {df.shape}")
    gc.collect()
    
    # Merge with static item features
    print("Merging static item features...")
    df = pd.merge(df, df_item_categories_ref, on='video_id', how='left')
    df['num_item_tags'] = df['num_item_tags'].fillna(0).astype('int16')
    print(f"Shape after static item features merge: {df.shape}")
    gc.collect()

    # Merge with lean dynamic item features
    print("Merging dynamic item features (lean)...")
    df = pd.merge(df, df_item_daily_ref_lean,
                     left_on=['video_id', 'interaction_date'],
                     right_on=['video_id', 'item_stats_date'], # item_stats_date is the 'date' from daily features
                     how='left')
    print(f"Shape after dynamic item features merge: {df.shape}")
    gc.collect()

    # Post-merge feature engineering
    print("Post-merge feature engineering...")
    if 'upload_dt_parsed' in df.columns and 'interaction_datetime' in df.columns:
        video_age_delta = df['interaction_datetime'] - df['upload_dt_parsed']
        df['video_age_days'] = video_age_delta.dt.days.fillna(-1).astype('int16')
    else:
        # Ensure series has same index if created manually
        df['video_age_days'] = pd.Series([-1]*len(df), index=df.index, dtype='int16') 

    # NA filling for columns that came from the lean daily merge
    # These are columns from item_daily_cols_to_keep_for_merge (excluding join keys & upload_dt_parsed)
    cols_from_daily_merge_to_fill = [
        'author_id', 'video_type', 'video_duration_daily', 'daily_play_progress',
        'daily_play_per_show_ratio', 'daily_like_per_play_ratio', 'daily_completion_rate',
        'video_tag_id'
    ]
    for col in cols_from_daily_merge_to_fill:
        if col in df.columns:
            if col in ['author_id', 'video_tag_id']:
                df[col] = df[col].fillna(-1).astype('int32')
            elif col == 'video_type':
                if df[col].isnull().any(): # Only process if NaNs exist
                    # Ensure it's category type first
                    if df[col].dtype.name != 'category':
                         df[col] = pd.Categorical(df[col]) # Convert if it lost category type
                    # Add 'Unknown' category if not present
                    if 'Unknown' not in df[col].cat.categories:
                        df[col] = df[col].cat.add_categories('Unknown')
                    df[col] = df[col].fillna('Unknown')
            else: # Numerical daily features (durations, ratios, progress)
                df[col] = df[col].fillna(0).astype('float32')
        # else:
            # print(f"Warning: Column '{col}' expected from daily merge not found post-merge.")
            
    df = df.sort_values(by='timestamp').reset_index(drop=True)
    print(f"Finished engineering. Shape: {df.shape}")
    return df

## 5. Apply Feature Engineering to Interaction Matrices

Now, we use the defined `engineer_features_for_interactions` function to process both the `big_matrix` (for training/testing the main model) and the `small_matrix` (for dense evaluation).

In [None]:
print("\n--- Applying Feature Engineering to Big Matrix Interactions ---")
df_big_merged = engineer_features_for_interactions(df_big_interactions, df_user_features, df_item_categories, df_item_daily_lean_for_merge)
del df_big_interactions
gc.collect()

print("\n--- Applying Feature Engineering to Small Matrix Interactions ---")
df_small_eval_features = engineer_features_for_interactions(df_small_interactions, df_user_features, df_item_categories, df_item_daily_lean_for_merge)
del df_small_interactions
gc.collect()

# At this point, original feature DataFrames can be deleted if memory is critical
# del df_user_features, df_item_categories, df_item_daily_lean_for_merge
# gc.collect()

## 6. Prepare Data for ALS Candidate Generation

The ALS model requires a user-item interaction matrix. We create this from `df_big_merged`:
- Map original `user_id`s and `video_id`s to 0-based integer indices.
- Use `watch_ratio` (clipped to be > 0) as the interaction strength/confidence.
- Construct a sparse matrix (`csr_matrix`).
- Save the ID mappings for later use (e.g., translating recommendations back to original IDs).

In [None]:
print("\n--- Preparing Data for ALS (Candidate Generation from Big Matrix) ---")
if 'user_id' not in df_big_merged.columns or 'video_id' not in df_big_merged.columns:
    raise ValueError("user_id or video_id not found in df_big_merged for ALS prep.")

unique_users_als_np = df_big_merged['user_id'].unique()
unique_videos_als_np = df_big_merged['video_id'].unique()

user_to_idx = {int(user_id): i for i, user_id in enumerate(unique_users_als_np)}
idx_to_user = {i: int(user_id) for i, user_id in enumerate(unique_users_als_np)} 
num_users_als = len(unique_users_als_np)

video_to_idx = {int(video_id): i for i, video_id in enumerate(unique_videos_als_np)}
idx_to_video = {i: int(video_id) for i, video_id in enumerate(unique_videos_als_np)}
num_videos_als = len(unique_videos_als_np)

del unique_users_als_np, unique_videos_als_np # Free numpy arrays
gc.collect()

als_user_ids = df_big_merged['user_id'].map(lambda x: user_to_idx.get(int(x)))
als_item_ids = df_big_merged['video_id'].map(lambda x: video_to_idx.get(int(x)))

# Check for NaNs that might arise if IDs in df_big_merged are not in user_to_idx/video_to_idx keys
# This should ideally not happen if mappings are derived from the same df_big_merged.
if als_user_ids.isnull().any() or als_item_ids.isnull().any():
    print("Warning: NaNs found in ALS mapped IDs. Filling with -1. Investigate if unexpected.")
    # A row with user_id or item_id = -1 (our NA placeholder) would map to None via .get()
    # These rows should be filtered out or handled if they are significant.
    # For csr_matrix, indices must be non-negative. Let's filter them out.
    valid_indices_mask = ~(als_user_ids.isnull() | als_item_ids.isnull())
    als_user_ids = als_user_ids[valid_indices_mask]
    als_item_ids = als_item_ids[valid_indices_mask]
    als_ratings_source = df_big_merged['watch_ratio'][valid_indices_mask]
    print(f"Filtered out {len(df_big_merged) - len(als_ratings_source)} rows with invalid ALS IDs.")
else:
    als_ratings_source = df_big_merged['watch_ratio']

als_ratings_clipped = np.maximum(als_ratings_source.astype('float32'), 0.001)

interaction_matrix_als = csr_matrix((als_ratings_clipped, (als_user_ids.astype(int), als_item_ids.astype(int))),
                                    shape=(num_users_als, num_videos_als))
print(f"ALS Sparse Matrix Shape: {interaction_matrix_als.shape}, NNZ: {interaction_matrix_als.nnz}")

# Save ID mappings
for mapping_dict, name in [
    (user_to_idx, 'user_to_idx_als.json'), 
    (video_to_idx, 'video_to_idx_als.json'),
    (idx_to_user, 'idx_to_user_als.json'),
    (idx_to_video, 'idx_to_video_als.json')
]:
    with open(os.path.join(PROCESSED_DATA_PATH, name), 'w') as f: 
        json.dump(mapping_dict, f)
print(f"ALS ID mappings saved to {PROCESSED_DATA_PATH}")

del als_user_ids, als_item_ids, als_ratings_source, als_ratings_clipped
gc.collect()

## 7. Prepare Data for LightGBM Ranker and Save Processed Files

This final data preparation step involves:
- Splitting `df_big_merged` chronologically into training and testing sets for the LightGBM model.
- Dynamically identifying categorical and numerical features based on the columns present in the processed DataFrames.
- Ensuring all categorical features are correctly typed across `train_df_lgbm`, `test_df_lgbm`, and `df_small_eval_features`.
- Saving the following files to disk:
    - `lightgbm_train_data.parquet`: Training data for LightGBM (features + target from `big_matrix`).
    - `lightgbm_test_data.parquet`: Test data for LightGBM (features + target from `big_matrix` holdout).
    - `ground_truth_test_big_matrix.csv`: User-item pairs and actual `watch_ratio` for the `big_matrix` test set.
    - `small_matrix_eval_features_data.parquet`: Fully processed `small_matrix` data (features + target) for dense evaluation.

In [None]:
print("\n--- Preparing Data for LightGBM Ranker & Saving Processed Files ---")

# Chronological split of df_big_merged for LightGBM training and standard testing
split_point = int(len(df_big_merged) * 0.8)
train_df_lgbm = df_big_merged.iloc[:split_point].copy()
test_df_lgbm = df_big_merged.iloc[split_point:].copy()
del df_big_merged # Free memory
gc.collect()

print(f"LGBM Train data shape (from big matrix): {train_df_lgbm.shape}")
print(f"LGBM Test data shape (from big matrix): {test_df_lgbm.shape}")

target_col = 'watch_ratio'
cols_to_drop_for_lgbm_features = [
    'time', 'timestamp', 'interaction_datetime', 'upload_dt', 'upload_dt_parsed',
    'item_stats_date', 'play_duration'
]

# Dynamically define categorical features for LightGBM based on selected features
base_categorical_features = ['user_id', 'video_id', 'user_active_degree', 
                             'interaction_hour', 'interaction_day_of_week']
user_flag_categoricals = ['is_lowactive_period', 'is_live_streamer', 'is_video_author']
daily_item_categoricals = ['author_id', 'video_type', 'video_tag_id']

master_categorical_list = base_categorical_features + \
                            [flag for flag in user_flag_categoricals if flag in train_df_lgbm.columns] + \
                            onehot_feature_names + \
                            [feat for feat in daily_item_categoricals if feat in train_df_lgbm.columns]

# Ensure categorical features are correctly typed in all DataFrames that will be used by LightGBM
final_lgbm_categorical_features = [] # This will be the definitive list based on train_df_lgbm
dfs_for_lgbm_prep = {
    'train_lgbm': train_df_lgbm, 
    'test_lgbm': test_df_lgbm, 
    'small_eval': df_small_eval_features
}

for df_name, current_df in dfs_for_lgbm_prep.items():
    print(f"Processing categorical dtypes for {df_name}...")
    for col in master_categorical_list:
        if col in current_df.columns:
            if current_df[col].dtype.name != 'category':
                # Fill NaNs that might have been introduced if a merge key was -1 (NA placeholder)
                if current_df[col].isnull().any():
                    if col in ['user_active_degree', 'video_type']:
                        # Ensure 'Unknown_Merge_NA' can be added if it's already category
                        if current_df[col].dtype.name == 'category' and 'Unknown_Merge_NA' not in current_df[col].cat.categories:
                            current_df[col] = current_df[col].cat.add_categories("Unknown_Merge_NA")
                        current_df[col] = current_df[col].fillna("Unknown_Merge_NA")
                    else: # For ID-like features or onehot encoded features
                         current_df[col] = current_df[col].fillna(-1)
                current_df[col] = current_df[col].astype('category')
            
            # Build the final list of categorical features based on train_df_lgbm
            if df_name == 'train_lgbm' and col not in final_lgbm_categorical_features:
                final_lgbm_categorical_features.append(col)

# Identify numerical features (all remaining columns not categorical, target, or to be dropped)
all_columns_in_train = list(train_df_lgbm.columns)
numerical_features_lgbm = [
    col for col in all_columns_in_train
    if col not in final_lgbm_categorical_features and \
       col != target_col and \
       col not in cols_to_drop_for_lgbm_features
]
lgbm_feature_columns = final_lgbm_categorical_features + numerical_features_lgbm

print(f"Final categorical features for LightGBM: {len(final_lgbm_categorical_features)} features")
print(f"Final numerical features for LightGBM: {len(numerical_features_lgbm)} features")
print(f"Total features for LightGBM: {len(lgbm_feature_columns)}")

# --- Save Processed DataFrames to Parquet/CSV ---
print("\nSaving processed LightGBM training, testing, and small_matrix evaluation data...")

# Training data (from big_matrix)
X_train = train_df_lgbm[lgbm_feature_columns]
y_train = train_df_lgbm[target_col]
pd.concat([X_train, y_train.rename(target_col)], axis=1).to_parquet(
    os.path.join(PROCESSED_DATA_PATH, 'lightgbm_train_data.parquet'), index=False
)
print(f"Saved lightgbm_train_data.parquet: {X_train.shape[0]} rows, {X_train.shape[1]+1} cols")
del X_train, y_train, train_df_lgbm
gc.collect()

# Testing data (from big_matrix holdout)
X_test = test_df_lgbm[lgbm_feature_columns]
y_test = test_df_lgbm[target_col]
pd.concat([X_test, y_test.rename(target_col)], axis=1).to_parquet(
    os.path.join(PROCESSED_DATA_PATH, 'lightgbm_test_data.parquet'), index=False
)
print(f"Saved lightgbm_test_data.parquet: {X_test.shape[0]} rows, {X_test.shape[1]+1} cols")

# Ground truth for the standard test set (big_matrix holdout)
test_df_lgbm[['user_id', 'video_id', target_col]].to_csv(
    os.path.join(PROCESSED_DATA_PATH, 'ground_truth_test_big_matrix.csv'), index=False
)
print(f"Saved ground_truth_test_big_matrix.csv: {test_df_lgbm.shape[0]} rows")
del X_test, y_test, test_df_lgbm
gc.collect()

# Small matrix evaluation data (fully processed)
# Ensure only relevant columns (features + target) are saved for small_eval_features
small_eval_cols_to_save = [col for col in lgbm_feature_columns if col in df_small_eval_features.columns]
if target_col in df_small_eval_features.columns:
    small_eval_cols_to_save.append(target_col)
else:
    print(f"Warning: Target column '{target_col}' not found in df_small_eval_features.")

df_small_eval_features[small_eval_cols_to_save].to_parquet(
    os.path.join(PROCESSED_DATA_PATH, 'small_matrix_eval_features_data.parquet'), index=False
)
print(f"Saved small_matrix_eval_features_data.parquet: {df_small_eval_features.shape[0]} rows, {len(small_eval_cols_to_save)} cols")
del df_small_eval_features
gc.collect()

print("\n--- Data Preparation and Feature Engineering Complete. Processed files are in ../data/ ---")