In [1]:
import os
import sys

PROJECT_ROOT = "/home/pablo/Documents/datathon2025-smadex"

if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("Contenido de ./data:", os.listdir(os.path.join(PROJECT_ROOT, "data")))

PROJECT_ROOT: /home/pablo/Documents/datathon2025-smadex
Contenido de ./data: ['test', 'sample_submission.csv', 'train']


In [None]:
import dask
import dask.dataframe as dd
import pandas as pd
import json
import numpy as np
import joblib
from sklearn.preprocessing import StandardScaler
dask.config.set({"dataframe.convert-string": False})

In [None]:

DATASET_PATH = "../data/train"
EMBEDDINGS_MAPPING_FILE = "embeddings_mappings.json"
SCALER_FILE = "scaler.joblib"

CATEGORICAL_FEATURES = [
    'advertiser_bundle',
    'advertiser_category',
    'advertiser_subcategory',
    'advertiser_bottom_taxonomy_level',
    'country',
    'dev_make',
    'dev_model',
    'dev_os',
    'dev_osv',
    'release_date',
    'avg_daily_sessions',
    'avg_duration',
    'bcat',
    'bcat_bottom_taxonomy',
    'bundles_ins',
    'city_hist',
    'country_hist',
    'ctr',
    'dev_language_hist',
    'dev_osv_hist',
    'hour_ratio',
    'user_actions_bundles_action_count',
    'new_bundles',
    'region_hist',
    'user_bundles',
    'user_bundles_l28d'
]
NUMERICAL_FEATURES = ['buyer_d1', 'buyer_d7', 'buyer_d14', 'buyer_d28', 'buy_d7', 'buy_d14', 'buy_d28', 'iap_revenue_d7', 'iap_revenue_d14', 'iap_revenue_d28', 'registration', 'retention_d1_to_d7', 'retention_d3_to_d7', 'retention_d7_to_d14', 'retention_d1', 'retention_d3', 'retentiond7', 'hour', 'release_msrp', 'weekday', 'avg_act_days', 'bundles_cat', 'bundles_cat_bottom_taxonomy', 'first_request_ts_bundle', 'iap_revenue_usd_bundle', 'last_buy', 'last_buy_ts_bundle', 'last_buy_ts_category', 'last_ins', 'user_actions_bundles_action_last_timestamp', 'num_buys_bundle', 'rev_by_adv', 'rwd_prank', 'weekend_ratio', 'weeks_since_first_seen', 'wifi_ratio', 'whale_users_bundle_num_buys_prank', 'whale_users_bundle_revenue_prank', 'whale_users_bundle_total_num_buys', 'whale_users_bundle_total_revenue', 'last_buy_safe', 'last_ins_safe', 'last_buy_dt', 'last_ins_dt', 'hours_since_last_buy', 'hours_since_last_ins']
TARGET = "iap_revenue_d7"

def load_embeddings_mapping():
    with open(EMBEDDINGS_MAPPING_FILE, "r") as f:
        mappings = json.load(f)
    return mappings

EMBEDDING_MAPPINGS = load_embeddings_mapping("embeddings_mappings.json")
COLS_TO_READ = ['buyer_d1', 
             'buyer_d7', 
             'buyer_d14', 
             'buyer_d28', 
             'buy_d7', 
             'buy_d14', 
             'buy_d28', 
             'iap_revenue_d7', 
             'iap_revenue_d14', 
             'iap_revenue_d28', 
             'registration', 
             'retention_d1_to_d7', 
             'retention_d3_to_d7', 
             'retention_d7_to_d14', 
             'retention_d1', 
             'retention_d3', 
             'retentiond7', 
             'advertiser_bundle', 
             'advertiser_category', 
             'advertiser_subcategory', 
             'advertiser_bottom_taxonomy_level', 
             'country', 
             'dev_make', 
             'dev_model', 
             'dev_os', 
             'dev_osv', 
             'hour', 
             'release_date', 
             'release_msrp', 
             'weekday', 
             'avg_act_days', 
             'avg_daily_sessions', 
             'avg_duration', 
             'bcat_bottom_taxonomy', 
             'bundles_cat', 
             'bundles_cat_bottom_taxonomy',  
             'ctr',  
             'first_request_ts_bundle', 
             'iap_revenue_usd_bundle', 
             'last_buy', 
             'last_buy_ts_bundle', 
             'last_buy_ts_category', 
             'last_ins', 
             'user_actions_bundles_action_last_timestamp', 
             'num_buys_bundle', 
             'rev_by_adv', 
             'rwd_prank', 
             'weekend_ratio', 
             'weeks_since_first_seen', 
             'wifi_ratio', 
             'whale_users_bundle_num_buys_prank', 
             'whale_users_bundle_revenue_prank', 
             'whale_users_bundle_total_num_buys', 
             'whale_users_bundle_total_revenue']

train_filters = [("datetime", ">=", "2025-10-01-00-00"), ("datetime", "<", "2025-10-06-00-00")]

ddf_train = dd.read_parquet(
    DATASET_PATH,
    engine="pyarrow",
    columns=COLS_TO_READ,
    filters=train_filters
)

# loaded_scaler = joblib.load(SCALER_FILE)

In [None]:


def process_partition(df):
    transform_variables(df)
    scale_numerical_features(df, loaded_scaler)
    impute_missings(df)

def hours_since_now_from_list(tuples_list, now_ts):
    # Check if the input is a list
    if isinstance(tuples_list, list) and len(tuples_list) > 0:
        # Extract all numeric timestamps from the tuples
        timestamps = []
        for t in tuples_list:
            if isinstance(t, tuple) and len(t) == 2:
                ts = t[1]
                if ts is not None and not isinstance(ts, list):
                    timestamps.append(ts)
        if len(timestamps) > 0:
            # Use the largest timestamp (closest to now)
            max_ts = max(timestamps)
            return (now_ts - max_ts) / 3600  # seconds â†’ hours
    return np.nan

def extract_numbers(tuple_list):
    """Extract only the numeric part from a list of (id, value) tuples."""
    if isinstance(tuple_list, list):
        return [t[1] for t in tuple_list if isinstance(t, tuple) and len(t) >= 2]
    return []

def aggregate(values, mode):
    """Apply either sum or mean depending on mode."""
    if not values:
        return np.nan
    if mode == "sum":
        return sum(values)
    return sum(values) / len(values)

def transform_variables(df):
    now = pd.Timestamp.now()

    # Define reasonable bounds for Unix timestamps (seconds)
    min_ts = 0                  # 1970-01-01
    max_ts = 4102444800         # 2100-01-01 in Unix seconds

    # Replace invalid timestamps with NaN
    df["last_buy_safe"] = df["last_buy"].where(
        df["last_buy"].between(min_ts, max_ts), np.nan
    )
    df["last_ins_safe"] = df["last_ins"].where(
        df["last_ins"].between(min_ts, max_ts), np.nan
    )

    # Convert safe Unix timestamps to datetime
    df["last_buy_dt"] = pd.to_datetime(df["last_buy_safe"], unit="s")
    df["last_ins_dt"] = pd.to_datetime(df["last_ins_safe"], unit="s")

    # Compute hours ago
    df["hours_since_last_buy"] = (now - df["last_buy_dt"]).dt.total_seconds() / 3600
    df["hours_since_last_ins"] = (now - df["last_ins_dt"]).dt.total_seconds() / 3600

    # Drop the original Unix timestamp columns
    df = df.drop(columns=["last_buy", "last_ins", "last_buy_safe", "last_ins_dt", "last_buy_dt", "last_ins_safe"])




    # Convert 'hour' from string to integer
    df['hour'] = df['hour'].astype(int)

    # Convert hour to radians (full circle = 24 hours)
    radians = df['hour'] * (2 * np.pi / 24)

    # Create two new columns: sin_hour and cos_hour
    df['hour_sin'] = np.sin(radians)
    df['hour_cos'] = np.cos(radians)

    # Drop the original 'hour' column
    df.drop(columns=['hour'], inplace=True)




    # Ensure `now_ts` is a Unix timestamp
    now_ts = int(pd.Timestamp.now().timestamp())

    # Apply to your 4 timestamp columns
    ts_cols = [
        "first_request_ts_bundle",
        "last_buy_ts_bundle",
        "last_buy_ts_category",
        "user_actions_bundles_action_last_timestamp"
    ]

    for col in ts_cols:
        new_col = col + "_hours_ago"
        df[new_col] = df[col].apply(lambda x: hours_since_now_from_list(x, now_ts))

    # Drop the original tuple columns if you want
    df.drop(columns=ts_cols, inplace=True)




    rules = {
        "iap_revenue_usd_bundle": "sum",
        "num_buys_bundle": "sum",
        "rev_by_adv": "sum",
        "rwd_prank": "mean",
        "whale_users_bundle_num_buys_prank": "mean",
        "whale_users_bundle_revenue_prank": "mean",
        "whale_users_bundle_total_num_buys": "sum",
        "whale_users_bundle_total_revenue": "sum",
        "avg_daily_sessions": "mean",
        "avg_duration": "mean",
        "bcat_bottom_taxonomy": "mean",
        "ctr": "sum",
    }

    for col, mode in rules.items():
        new_col = col + "_agg"
        df[new_col] = df[col].apply(
            lambda lst: aggregate(extract_numbers(lst), mode)
        )

    df.drop(columns=list(rules.keys()), inplace=True)

def impute_missings(df):

    for col in CATEGORICAL_FEATURES:
        df[col] = df[col].fillna("<MISSING>")

        # convert to embeddings indices (if category not seen before, assign index 0)
        mapping = EMBEDDING_MAPPINGS[col]
        df[col] = df[col].map(lambda x: mapping.get(x, 0))

    for col in NUMERICAL_FEATURES:
        # 1. Create missing indicator
        df[f"{col}_is_missing"] = df[col].isna().astype(float)

        # 2. Impute missings with 0
        df[col] = df[col].fillna(0).astype(float)

def scale_numerical_features(df, scaler):
    df[NUMERICAL_FEATURES] = scaler.transform(df[NUMERICAL_FEATURES])

def train_scaler(df):
    scaler = StandardScaler()
    scaler.fit(df[NUMERICAL_FEATURES])
    joblib.dump(scaler, SCALER_FILE)
    print("Scaler saved.")

def generate_embeddings_mapping(pdf):
    # Use after transform_variables but BEFORE imputing missings
    
    mappings = {}
    for feature in CATEGORICAL_FEATURES:
        unique_values = ["<MISSING>"] + pdf[feature].dropna().unique().tolist()
        embeddings = {val: idx for idx, val in enumerate(unique_values)}
        mappings[feature] = embeddings
    
    # Save mappings to disk
    with open(EMBEDDINGS_MAPPING_FILE, "w") as f:
        json.dump(mappings, f)


In [None]:
pdf_train = ddf_train.compute()
print("Training data loaded.")

transform_variables(pdf_train)
generate_embeddings_mapping(pdf_train)
train_scaler(pdf_train)

In [None]:
import pandas as pd
import numpy as np

import os

# --- 1. Define Data and Columns to Scale ---
cols_to_scale = ['age', 'income']
scaler_filename = 'my_scaler.joblib'

# Create a training DataFrame
df_train = pd.DataFrame({
    'age': [20, 30, 40, 50, np.nan],
    'income': [50000, 60000, np.nan, 80000, 75000],
    'city': ['New York', 'London', 'Paris', 'Tokyo', 'Sydney']
})

print("Original Training Data:")
print(df_train)
print("-" * 30)

# --- 2. Initialize, Fit, and Save Scaler ---

# Initialize the scaler
scaler = StandardScaler()

# Fit the scaler ONLY on the specified columns
# .fit() ignores NaNs when calculating mean and std
print(f"Fitting scaler on columns: {cols_to_scale}")
scaler.fit(df_train[cols_to_scale])

# Save the fitted scaler to a file
joblib.dump(scaler, scaler_filename)
print(f"Scaler saved to {scaler_filename}")
print("-" * 30)


# --- 3. Load Scaler and Transform New Data ---

# Create new "test" data to transform
df_test = pd.DataFrame({
    'age': [35, 45, np.nan],
    'income': [55000, 90000, 62000],
    'city': ['Berlin', 'Moscow', 'Lisbon']
})

print("New Data (Before Transform):")
print(df_test)
print(f"Column names before: {df_test.columns.tolist()}")
print("-" * 30)

# Load the scaler from the file
loaded_scaler = joblib.load(scaler_filename)
print("Scaler loaded.")

# Create a copy to avoid SettingWithCopyWarning
# df_test_transformed = df_test.copy()

# --- This is the key step ---
# Transform only the specified columns and assign them back
# to the *same columns* in the copied DataFrame.
# .transform() scales non-NaNs and keeps NaNs as NaN.
df_test[cols_to_scale] = loaded_scaler.transform(df_test[cols_to_scale])

# --- 4. Verify Results ---

print("\nNew Data (After Transform):")
print(df_test)
print(f"Column names after:  {df_test.columns.tolist()}")

# Check that column names and order are identical
are_names_same = df_test.columns.equals(df_test.columns)
print(f"\nColumn names and order unchanged: {are_names_same}")

# Clean up the created file
os.remove(scaler_filename)

Original Training Data:
    age   income      city
0  20.0  50000.0  New York
1  30.0  60000.0    London
2  40.0      NaN     Paris
3  50.0  80000.0     Tokyo
4   NaN  75000.0    Sydney
------------------------------
Fitting scaler on columns: ['age', 'income']
Scaler saved to my_scaler.joblib
------------------------------
New Data (Before Transform):
    age  income    city
0  35.0   55000  Berlin
1  45.0   90000  Moscow
2   NaN   62000  Lisbon
Column names before: ['age', 'income', 'city']
------------------------------
Scaler loaded.

New Data (After Transform):
        age    income    city
0  0.000000 -0.943456  Berlin
1  0.894427  1.991741  Moscow
2       NaN -0.356417  Lisbon
Column names after:  ['age', 'income', 'city']

Column names and order unchanged: True


In [None]:
import dask.dataframe as dd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import pandas as pd
import warnings

# --- Configuration ---
TRAIN_DATA_PATH = 'data/smadex-challenge-predict-the-revenue/train/train/'
EPOCHS = 2
BATCH_SIZE = 1024
LEARNING_RATE = 0.001
TARGET_COLUMN = 'target' 

# Suppress Dask warnings
warnings.filterwarnings("ignore", category=UserWarning, module="dask")

# --- 1. Define the PyTorch Model ---

class SimpleMLP(nn.Module):
    """
    A simple Multi-Layer Perceptron (MLP) for regression.
    Assumes all input data is numerical and pre-scaled (0-1).
    """
    def __init__(self, input_dim):
        super(SimpleMLP, self).__init__()
        self.layer_1 = nn.Linear(input_dim, 64)
        self.relu_1 = nn.ReLU()
        self.layer_2 = nn.Linear(64, 32)
        self.relu_2 = nn.ReLU()
        self.output_layer = nn.Linear(32, 1) # Output 1 value (target)

    def forward(self, x):
        x = self.relu_1(self.layer_1(x))
        x = self.relu_2(self.layer_2(x))
        x = self.output_layer(x)
        return x

# --- 2. Main Training Function ---

def main():
    """
    Minimal function to load data by partition and train the model on CUDA.
    """
    
    # --- Setup Device (CUDA or CPU) ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # --- Load Dask DataFrame (Lazily) ---
    print(f"Loading data from: {TRAIN_DATA_PATH}")
    
    try:
        # Load the dataset lazily just to get metadata (column names)
        temp_ddf = dd.read_parquet(TRAIN_DATA_PATH, engine='pyarrow')
        all_columns = list(temp_ddf.columns)
    except Exception as e:
        print(f"Error reading parquet metadata from {TRAIN_DATA_PATH}. {e}")
        print("Please check your path.")
        return

    # --- Identify Feature and Target Columns ---
    if TARGET_COLUMN not in all_columns:
        print(f"Error: Target column '{TARGET_COLUMN}' not found in data.")
        print(f"Available columns: {all_columns}")
        return
        
    # Assume all other columns are features (as requested for a minimal example)
    FEATURE_COLUMNS = [col for col in all_columns if col != TARGET_COLUMN]
    
    if not FEATURE_COLUMNS:
        print("Error: No feature columns found. Check your data.")
        return

    print(f"Identified Target: '{TARGET_COLUMN}'")
    print(f"Identified {len(FEATURE_COLUMNS)} Features. First 5: {FEATURE_COLUMNS[:5]}...")
    
    # Select only the columns we need. This is still lazy.
    ddf = temp_ddf[FEATURE_COLUMNS + [TARGET_COLUMN]]

    # --- Initialize Model, Loss, Optimizer ---
    input_dim = len(FEATURE_COLUMNS)
    model = SimpleMLP(input_dim=input_dim).to(device)
    criterion = nn.MSELoss() # Mean Squared Error for regression
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    model.train() # Set model to training mode

    # --- Training Loop: Partition by Partition ---
    for epoch in range(EPOCHS):
        print(f"\n--- Starting Epoch {epoch + 1}/{EPOCHS} ---")
        
        # Get a list of delayed tasks, one for each partition file
        partition_iterator = ddf.to_delayed()
        
        for i, partition in enumerate(partition_iterator):
            # 1. Load *only this single partition* into memory as a Pandas DF
            pdf = partition.compute()
            
            if pdf.empty:
                print(f"  Skipping empty partition {i+1}")
                continue



            
            




            features_tensor = torch.tensor(
                pdf[FEATURE_COLUMNS].values, 
                dtype=torch.float32
            )
            
            target_tensor = torch.tensor(
                pdf[TARGET_COLUMN].values, 
                dtype=torch.float32
            ).view(-1, 1)
            
            # 3. Create DataLoader for batching within the partition
            partition_dataset = TensorDataset(features_tensor, target_tensor)
            partition_loader = DataLoader(dataset=partition_dataset, batch_size=BATCH_SIZE, shuffle=True)

            # 4. Mini-Batch Training Loop (for this partition)
            for batch_num, (batch_features, batch_target) in enumerate(partition_loader):
                # Forward pass
                outputs = model(batch_features)
                loss = criterion(outputs, batch_target)
                
                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            print(f"  Epoch {epoch+1} | Processed Partition {i+1} | Last Batch Loss: {loss.item():.4f}")

    print("\n--- Training Finished ---")


if __name__ == "__main__":
    main()

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import io

# --- 1. Create a dummy CSV file for this example ---
# (In your real code, you would just have 'data.csv')
csv_content = """cat_feature,num_feature,target
A,1.0,10
B,,20
,3.0,30
A,4.0,40
B,5.0,50
,,60
"""
with open('data.csv', 'w') as f:
    f.write(csv_content)

# --- 2. Define the Custom Dataset ---
# This is the "right place" to do the transformations.
# We do them all at once in __init__ for efficiency.

class ImputationDataset(Dataset):
    def __init__(self, csv_file):
        # Load dataframe, treating empty strings as NaN
        df = pd.read_csv(csv_file, na_values=["", " "])

        # --- Categorical Feature Transformation ---
        # 1. Impute missings with the new value "<MISSING>"
        df['cat_feature'] = df['cat_feature'].fillna("<MISSING>")
        # 2. Factorize to get numerical codes (e.g., A=0, B=1, <MISSING>=2)
        # We save 'self.categories' to know the mapping
        self.cat_codes, self.categories = pd.factorize(df['cat_feature'])
        self.cat_data = torch.tensor(self.cat_codes, dtype=torch.long)

        # --- Numerical Feature Transformation ---
        # 1. Create the 'is_missing' binary feature
        is_missing_vals = torch.tensor(
            df['num_feature'].isna().astype(float), dtype=torch.float32
        )
        # 2. Impute missings with 0
        num_vals = torch.tensor(
            df['num_feature'].fillna(0).values, dtype=torch.float32
        )
        # 3. Stack the numerical feature and its 'is_missing' indicator
        # Shape will be (n_samples, 2)
        self.num_combined_data = torch.stack([num_vals, is_missing_vals], dim=1)

        # --- Target Variable ---
        self.targets = torch.tensor(df['target'].values, dtype=torch.float32)

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, idx):
        # Return the processed features and the target
        # x_cat is a Long (for embedding lookup)
        # x_num_combined is a Float (for a linear layer)
        # y is a Float (the target)
        return (self.cat_data[idx], self.num_combined_data[idx]), self.targets[idx]

# --- 3. Define a Minimal Model to Consume the Data ---
# This model shows how to use the data from our loader.
class SimpleModel(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=4):
        super().__init__()
        # Embedding layer for the categorical feature
        self.embedding = nn.Embedding(num_embeddings, embedding_dim)
        
        # Linear layer for the combined features
        # Input: embedding_dim + 2 (from num_feature + is_missing)
        self.fc = nn.Linear(embedding_dim + 2, 1)

    def forward(self, x_cat, x_num_combined):
        # x_cat shape: (batch_size)
        # x_num_combined shape: (batch_size, 2)
        
        cat_embed = self.embedding(x_cat) # shape: (batch_size, embedding_dim)
        
        # Combine all features
        combined_features = torch.cat([cat_embed, x_num_combined], dim=1)
        
        return self.fc(combined_features)

# --- 4. Load Data and Demonstrate ---

# Initialize Dataset
dataset = ImputationDataset('data.csv')

# Get number of unique categories (A, B, <MISSING>) for Embedding layer
num_categories = len(dataset.categories)
print(f"Categories found: {list(dataset.categories)}")
print(f"Number of embeddings needed: {num_categories}\n")

# Initialize DataLoader
# batch_size=3 to show how it handles the 6 samples
data_loader = DataLoader(dataset, batch_size=3, shuffle=True)

# Initialize Model
model = SimpleModel(num_embeddings=num_categories)

# --- 5. Run one batch to see the output ---
print("--- Iterating over DataLoader ---")
for (x_cat, x_num_combined), y in data_loader:
    print(f"Batch X (categorical codes):\n{x_cat}")
    print(f"Batch X (numerical, is_missing):\n{x_num_combined}")
    print(f"Batch Y (target):\n{y}\n")
    
    # Pass data to the model
    output = model(x_cat, x_num_combined)
    
    print(f"Model Output shape: {output.shape}")
    print(f"Model Output:\n{output}\n")
    
    # We only run one batch for demonstration
    break

# Example of what the data looks like before the model:
# (Indices will vary due to shuffle=True)
#
# Original Data:
# cat_feature,num_feature,target
# A,1.0,10
# B,,20
# ,3.0,30
# A,4.0,40
# B,5.0,50
# ,,60
#
# Processed Data (example):
# x_cat: [1] (B)
# x_num_combined: [0.0, 1.0] (value=0, is_missing=1)
# y: [20.]
#
# x_cat: [2] (<MISSING>)
# x_num_combined: [3.0, 0.0] (value=3, is_missing=0)
# y: [30.]

In [2]:
import dask.dataframe as dd
import dask_ml.preprocessing
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import warnings
from sklearn.preprocessing import StandardScaler
from collections import defaultdict
import json

# --- 1. Configuration & Column Definitions ---

# --- Paths & Training ---
TRAIN_DATA_PATH = 'data/smadex-challenge-predict-the-revenue/train/train/'
EPOCHS = 2
BATCH_SIZE = 1024
LEARNING_RATE = 0.001

# --- Target Column ---
# Let's predict a regression target, e.g., 'iap_revenue_d7'
TARGET_COLUMN = 'iap_revenue_d7' 

# --- Feature Column Subsets (Demonstrating Each Type) ---
# We'll use these lists to apply different transformations.
# You can expand these lists with all your features.

# 1. Categorical: Will be integer-encoded and fed into Embedding layers
CAT_COLS = [
    'advertiser_category',
    'country', 
    'dev_make', 
    'dev_os'
]

# 2. Numerical: Will be log-transformed (if skewed) and scaled
# Note: 'release_msrp' is a raw feature. Others will be created by us.
NUM_COLS_RAW = [
    'release_msrp'
]

# 3. Cyclical: Will be transformed with sin/cos
CYCLICAL_COLS = [
    'hour', 
    'weekday'
]

# 4. Timestamp: Will be converted to "time since" (delta) features
TIMESTAMP_COLS = [
    'release_date', # Special case (Y-M format)
    'last_buy',
    'last_ins'
]

# 5. Map/Dict: Will be unrolled or aggregated
MAP_COLS = {
    'cpm': ['b', 'i', 'r'], # Unroll 'banner', 'interstitial', 'rewarded'
    'iap_revenue_usd_bundle': 'sum' # Aggregate: get total revenue
}

# 6. List: Will be aggregated
LIST_COLS = {
    'user_bundles': 'count', # Get len() of the list
    'new_bundles': 'count'
}

# 7. Histogram: Will be aggregated
HIST_COLS = {
    'city_hist': 'total_requests' # Sum all values in the hist
}

# --- Preprocessing Parameters ---
RARE_THRESHOLD = 10 # Group categories with < 10 occurrences into '<RARE>'
EMBEDDING_DIMS = {
    'advertiser_category': 10,
    'country': 16,
    'dev_make': 24,
    'dev_os': 8,
}

# --- Utility Columns (to be dropped before training) ---
METADATA_COLS = ['row_id', 'datetime']

# Suppress Dask warnings
warnings.filterwarnings("ignore", category=UserWarning, module="dask")
warnings.filterwarnings("ignore", category=FutureWarning)

# --- 2. The Deep Learning Model ---

class WideAndDeepModel(nn.Module):
    """
    A model that handles both categorical (via Embeddings) and 
    numerical features.
    """
    def __init__(self, vocab_sizes, embedding_dims, num_numerical_features):
        super(WideAndDeepModel, self).__init__()
        
        # --- Embedding Layers (Categorical) ---
        self.embedding_layers = nn.ModuleDict()
        total_embedding_dim = 0
        for col, vocab_size in vocab_sizes.items():
            dim = embedding_dims.get(col, 10) # Default to dim=10 if not specified
            self.embedding_layers[col] = nn.Embedding(vocab_size, dim)
            total_embedding_dim += dim
            
        print(f"Total embedding dimension: {total_embedding_dim}")
        
        # --- Deep Tower (Numerical + Embeddings) ---
        self.num_numerical_features = num_numerical_features
        input_dim = total_embedding_dim + self.num_numerical_features
        
        print(f"Total model input dim (Embeds + Numericals): {input_dim}")
        
        self.tower = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Dropout(0.2),
            nn.Linear(64, 1) # Output 1 value (target)
        )

    def forward(self, x_cat, x_num):
        """
        x_cat: A dictionary of {feature_name: tensor}
        x_num: A single tensor of numerical features
        """
        # --- Process Embeddings ---
        embeddings = []
        for col, tensor in x_cat.items():
            embeddings.append(self.embedding_layers[col](tensor))
        
        # Concatenate all embedding outputs
        x_embed = torch.cat(embeddings, dim=1)
        
        # --- Concatenate Numerical ---
        x_combined = torch.cat([x_embed, x_num], dim=1)
        
        # --- Pass through Tower ---
        return self.tower(x_combined)


# --- 3. Custom PyTorch Dataset ---

class HeterogeneousDataset(Dataset):
    """
    A Dataset to handle our two types of inputs:
    1. A dictionary of categorical tensors
    2. A single tensor of numerical features
    """
    def __init__(self, cat_data, num_data, target_data):
        self.cat_data = {col: torch.tensor(data, dtype=torch.long) for col, data in cat_data.items()}
        self.num_data = torch.tensor(num_data, dtype=torch.float32)
        self.target_data = torch.tensor(target_data, dtype=torch.float32).view(-1, 1)

    def __len__(self):
        # All inputs must have the same length
        return len(self.target_data)

    def __getitem__(self, idx):
        # Get categorical features for this index
        x_cat = {col: data[idx] for col, data in self.cat_data.items()}
        
        # Get numerical features for this index
        x_num = self.num_data[idx]
        
        # Get target for this index
        y = self.target_data[idx]
        
        return (x_cat, x_num), y


# --- 4. Feature Engineering & Preprocessing Class ---

class FeatureProcessor:
    """
    Handles fitting vocabs/scalers on Dask and transforming
    Pandas partitions during training.
    """
    def __init__(self, cat_cols, num_cols_raw, cyclical_cols, ts_cols, map_cols, list_cols, hist_cols, target_col):
        # Store column names
        self.CAT_COLS = cat_cols
        self.NUM_COLS_RAW = num_cols_raw
        self.CYCLICAL_COLS = cyclical_cols
        self.TS_COLS = ts_cols
        self.MAP_COLS = map_cols
        self.LIST_COLS = list_cols
        self.HIST_COLS = hist_cols
        self.TARGET_COL = target_col

        # These will be "fit"
        self.vocabularies = {}
        self.vocab_sizes = {}
        self.scaler = StandardScaler() # Use sklearn scaler, fit on 1st partition
        self.engineered_num_cols = []
        self._is_scaler_fit = False

    
    def fit_vocabs(self, ddf, rare_threshold):
        """
        Fits vocabularies on the full Dask dataframe.
        This is a global operation and should be done once.
        """
        print("Computing vocabularies...")
        for col in self.CAT_COLS:
            print(f"  Fitting vocab for: {col}")
            # Get value counts, compute, and filter
            counts = ddf[col].value_counts().compute()
            frequent_labels = counts[counts >= rare_threshold].index.tolist()
            
            # Create vocab with special tokens
            vocab = defaultdict(lambda: 1) # 1 is '<RARE>'
            vocab['<MISSING>'] = 0
            vocab['<RARE>'] = 1
            for i, label in enumerate(frequent_labels, 2):
                vocab[label] = i
                
            self.vocabularies[col] = vocab
            self.vocab_sizes[col] = len(vocab)
            print(f"    Vocab size for {col}: {len(vocab)}")
    
    
    def engineer_features_partition(self, pdf, ref_datetime_col):
        """
        Applies all feature engineering logic to a single Pandas partition.
        This function CREATES the new numerical features.
        """
        
        # Create a copy to avoid SettingWithCopyWarning
        pdf_out = pd.DataFrame(index=pdf.index)
        ref_datetime = pd.to_datetime(ref_datetime_col)

        # --- 1. Pass-through Raw Features ---
        pdf_out[self.CAT_COLS] = pdf[self.CAT_COLS].fillna('<MISSING>')
        pdf_out[self.NUM_COLS_RAW] = pdf[self.NUM_COLS_RAW]
        pdf_out[self.TARGET_COL] = pdf[self.TARGET_COL]

        # --- 2. Cyclical Features ---
        for col in self.CYCLICAL_COLS:
            if col == 'hour':
                max_val = 23
            else: # weekday
                max_val = 6
            pdf_out[f'{col}_sin'] = np.sin(2 * np.pi * pdf[col] / max_val)
            pdf_out[f'{col}_cos'] = np.cos(2 * np.pi * pdf[col] / max_val)

        # --- 3. Timestamp (Delta) Features ---
        for col in self.TS_COLS:
            if col == 'release_date':
                # Special parser for "2023_october"
                ts = pd.to_datetime(pdf[col].str.replace('_', ' '), format='%Y %B', errors='coerce')
            else:
                # Standard unix timestamps
                ts = pd.to_datetime(pdf[col], unit='s', errors='coerce')
            
            # Calculate delta in days
            delta_days = (ref_datetime - ts).dt.total_seconds() / (60 * 60 * 24)
            pdf_out[f'days_since_{col}'] = delta_days

        # --- 4. Map/Dict Features ---
        for col, action in self.MAP_COLS.items():
            if isinstance(action, list): # Unroll
                # Handle missing/empty maps
                def safe_parse_map(x):
                    if isinstance(x, list): # Handle '[(k, v)]' format
                        return {k: v for k, v in x}
                    if isinstance(x, dict):
                        return x
                    return {}
                
                parsed_maps = pdf[col].apply(safe_parse_map)
                for key in action:
                    pdf_out[f'{col}_{key}'] = parsed_maps.apply(lambda m: m.get(key))
            
            elif action == 'sum': # Aggregate
                def safe_sum_map_values(x):
                    try:
                        if isinstance(x, list): # '[(k, v)]'
                            return sum(v for k, v in x)
                        if isinstance(x, dict):
                            return sum(x.values())
                    except:
                        return 0
                    return 0
                pdf_out[f'{col}_sum'] = pdf[col].apply(safe_sum_map_values)

        # --- 5. List Features ---
        for col, action in self.LIST_COLS.items():
            if action == 'count':
                pdf_out[f'{col}_count'] = pdf[col].apply(lambda x: len(x) if isinstance(x, list) else 0)

        # --- 6. Histogram Features ---
        for col, action in self.HIST_COLS.items():
            if action == 'total_requests':
                # Hist format is '[(key, count)]'
                def safe_sum_hist(x):
                    try:
                        return sum(count for key, count in x)
                    except:
                        return 0
                pdf_out[f'{col}_total_requests'] = pdf[col].apply(safe_sum_hist)
        
        # --- Store list of all engineered numerical columns ---
        if not self.engineered_num_cols:
            all_cols = set(pdf_out.columns)
            cat_target = set(self.CAT_COLS) | set([self.TARGET_COL])
            self.engineered_num_cols = sorted(list(all_cols - cat_target))
            print(f"Discovered {len(self.engineered_num_cols)} engineered numerical features.")
            print(f"First 5: {self.engineered_num_cols[:5]}")
        
        return pdf_out[self.CAT_COLS + self.engineered_num_cols + [self.TARGET_COL]]

    
    def transform_partition(self, pdf, ref_datetime_col):
        """
        Applies all transforms to a partition and returns data
        ready for the model.
        """
        # 1. Engineer all features
        pdf_fe = self.engineer_features_partition(pdf, ref_datetime_col)
        
        # 2. Separate data
        pdf_cat = pdf_fe[self.CAT_COLS]
        pdf_num = pdf_fe[self.engineered_num_cols]
        pdf_target = pdf_fe[self.TARGET_COL]

        # 3. Apply Vocabularies (Categorical)
        cat_data = {}
        for col in self.CAT_COLS:
            vocab = self.vocabularies[col]
            # Map values, fill NaNs from mapping with '<MISSING>' token (0)
            cat_data[col] = pdf_cat[col].map(vocab).fillna(0).values
        
        # 4. Apply Scaling (Numerical)
        # Handle NAs/Infs from log/deltas before scaling
        pdf_num = pdf_num.fillna(0).replace([np.inf, -np.inf], 0)
        
        if not self._is_scaler_fit:
            print("Fitting StandardScaler on first partition...")
            self.scaler.fit(pdf_num)
            self._is_scaler_fit = True
        
        num_data = self.scaler.transform(pdf_num)
        
        # 5. Get Target
        target_data = pdf_target.fillna(0).values
        
        return cat_data, num_data, target_data


# --- 5. Main Training Function ---

def main():
    """
    Main function to fit preprocessors and train the model.
    """
    
    # --- Setup Device (CUDA or CPU) ---
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # --- Load Dask DataFrame (Lazily) ---
    print(f"Loading data from: {TRAIN_DATA_PATH}")
    
    try:
        # Define dtypes for complex columns to help Dask
        # This is often necessary for lists/dicts
        dtypes = {
            'bcat': 'object',
            'bcat_bottom_taxonomy': 'object',
            'bundles_cat': 'object',
            'bundles_cat_bottom_taxonomy': 'object',
            'bundles_ins': 'object',
            'city_hist': 'object',
            'country_hist': 'object',
            'cpm': 'object',
            'cpm_pct_rk': 'object',
            'ctr': 'object',
            'ctr_pct_rk': 'object',
            'dev_language_hist': 'object',
            'dev_osv_hist': 'object',
            'first_request_ts_bundle': 'object',
            'first_request_ts_category_bottom_taxonomy': 'object',
            'hour_ratio': 'object',
            'iap_revenue_usd_bundle': 'object',
            'iap_revenue_usd_category': 'object',
            'iap_revenue_usd_category_bottom_taxonomy': 'object',
            'last_buy_ts_bundle': 'object',
            'last_buy_ts_category': 'object',
            'last_install_ts_bundle': 'object',
            'last_install_ts_category': 'object',
            'advertiser_actions_action_count': 'object',
            'advertiser_actions_action_last_timestamp': 'object',
            'user_actions_bundles_action_count': 'object',
            'user_actions_bundles_action_last_timestamp': 'object',
            'new_bundles': 'object',
            'num_buys_bundle': 'object',
            'num_buys_category': 'object',
            'num_buys_category_bottom_taxonomy': 'object',
            'region_hist': 'object',
            'rev_by_adv': 'object',
            'rwd_prank': 'object',
            'user_bundles': 'object',
            'user_bundles_l28d': 'object',
            'whale_users_bundle_num_buys_prank': 'object',
            'whale_users_bundle_revenue_prank': 'object',
            'whale_users_bundle_total_num_buys': 'object',
            'whale_users_bundle_total_revenue': 'object'
        }
        
        all_cols = (
            CAT_COLS + NUM_COLS_RAW + CYCLICAL_COLS + TIMESTAMP_COLS + 
            list(MAP_COLS.keys()) + list(LIST_COLS.keys()) + 
            list(HIST_COLS.keys()) + [TARGET_COLUMN] + METADATA_COLS
        )
        
        # Read only the columns we need
        ddf = dd.read_parquet(
            TRAIN_DATA_PATH, 
            engine='pyarrow', 
            columns=list(set(all_cols)) # Use set for uniqueness
        )
        
        # Ensure 'datetime' is parsed as a Dask datetime series
        ddf['datetime'] = dd.to_datetime(ddf['datetime'])

    except Exception as e:
        print(f"Error reading parquet metadata from {TRAIN_DATA_PATH}. {e}")
        return

    # --- 1. FIT PREPROCESSORS ---
    print("\n--- Starting Preprocessing 'fit' Step ---")
    processor = FeatureProcessor(
        CAT_COLS, NUM_COLS_RAW, CYCLICAL_COLS, TIMESTAMP_COLS,
        MAP_COLS, LIST_COLS, HIST_COLS, TARGET_COLUMN
    )
    
    # Fit vocabularies (requires a pass over the data)
    processor.fit_vocabs(ddf, rare_threshold=RARE_THRESHOLD)
    
    # NOTE: We will fit the scaler on the *first partition* during training.
    # Fitting a dask_ml.StandardScaler requires engineering all features
    # on the *entire* dask dataframe first, which is another full pass.
    # For speed in this example, we'll fit on the first batch.
    # For a production model, you should fit on the full ddf.
    print("--- Preprocessing 'fit' Complete ---")
    

    # --- 2. INITIALIZE MODEL ---
    # We must wait until after fitting vocabs to know the vocab sizes
    
    # We also need to know the *final* number of numerical features.
    # We'll get this by processing a dummy partition (the metadata).
    print("Discovering engineered feature dimensions...")
    dummy_pdf = ddf._meta_nonempty.copy()
    dummy_dt = pd.Series([pd.Timestamp.now()] * len(dummy_pdf), index=dummy_pdf.index)
    _ = processor.engineer_features_partition(dummy_pdf, dummy_dt)
    
    num_numerical_features = len(processor.engineered_num_cols)
    
    model = WideAndDeepModel(
        vocab_sizes=processor.vocab_sizes,
        embedding_dims=EMBEDDING_DIMS,
        num_numerical_features=num_numerical_features
    ).to(device)
    
    criterion = nn.MSELoss() # Mean Squared Error for regression
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    model.train() # Set model to training mode

    # --- 3. TRAINING LOOP: PARTITION BY PARTITION ---
    for epoch in range(EPOCHS):
        print(f"\n--- Starting Epoch {epoch + 1}/{EPOCHS} ---")
        
        # Get a list of delayed tasks, one for each partition file
        partition_iterator = ddf.to_delayed()
        
        for i, partition in enumerate(partition_iterator):
            # 1. Load *only this single partition* into memory as a Pandas DF
            pdf = partition.compute()
            
            if pdf.empty:
                print(f"  Skipping empty partition {i+1}")
                continue
                
            # 2. Extract reference datetime and drop metadata
            ref_datetime_col = pdf['datetime']
            pdf_features = pdf.drop(columns=METADATA_COLS, errors='ignore')

            # 3. Apply all preprocessing and transformations
            cat_data, num_data, target_data = processor.transform_partition(
                pdf_features, 
                ref_datetime_col
            )
            
            # 4. Create custom Dataset and DataLoader
            partition_dataset = HeterogeneousDataset(cat_data, num_data, target_data)
            partition_loader = DataLoader(
                dataset=partition_dataset, 
                batch_size=BATCH_SIZE, 
                shuffle=True
            )

            # 5. Mini-Batch Training Loop (for this partition)
            for batch_num, ((batch_cat, batch_num_features), batch_target) in enumerate(partition_loader):
                
                # Move data to GPU
                batch_cat = {k: v.to(device) for k, v in batch_cat.items()}
                batch_num_features = batch_num_features.to(device)
                batch_target = batch_target.to(device)

                # Forward pass
                outputs = model(batch_cat, batch_num_features)
                loss = criterion(outputs, batch_target)
                
                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            print(f"  Epoch {epoch+1} | Processed Partition {i+1} | Last Batch Loss: {loss.item():.4f}")

    print("\n--- Training Finished ---")


if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'torch'