In [1]:
import argparse
import os
import pickle
import json
import time
import logging
import warnings
from pathlib import Path
from typing import Dict, Any, List, Tuple, Optional, Union
from dataclasses import dataclass, asdict
from collections import defaultdict
import multiprocessing as mp

import numpy as np
import pandas as pd
import psutil

# ML/DL libraries
import tensorflow as tf
from tensorflow import keras
import tensorflow_recommenders as tfrs
from sklearn.metrics import roc_auc_score, log_loss
from sklearn.preprocessing import StandardScaler, LabelEncoder
from tensorflow.keras.callbacks import Callback

try:
    import wandb
    WANDB_AVAILABLE = True
except ImportError:
    WANDB_AVAILABLE = False

try:
    import faiss
    FAISS_AVAILABLE = True
except ImportError:
    FAISS_AVAILABLE = False

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
tf.get_logger().setLevel('ERROR')
warnings.filterwarnings('ignore')
tf.keras.mixed_precision.set_global_policy('mixed_float16')

2025-09-29 10:28:07.419031: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-09-29 10:28:07.653853: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-09-29 10:28:07.854616: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1759123688.053584    6459 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1759123688.103908    6459 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1759123688.496866    6459 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

In [2]:
@dataclass
class ModelConfig:
    """Configuration for the recommendation system"""
    # Model architecture
    embedding_dim: int = 128
    user_tower_dims: List[int] = None
    item_tower_dims: List[int] = None
    cross_layers: int = 3
    dnn_dims: List[int] = None
    dropout_rate: float = 0.2
    l2_reg: float = 1e-5
    
    # Training
    batch_size: int = 8192
    learning_rate_retrieval: float = 0.1
    learning_rate_ranking: float = 0.001
    epochs_retrieval: int = 10
    epochs_ranking: int = 15
    warmup_steps: int = 1000
    
    # Negative sampling
    num_hard_negatives: int = 5
    num_random_negatives: int = 100
    negative_sampling_strategy: str = "mixed"  # "random", "hard", "mixed"
    
    # Multi-task learning
    ctr_weight: float = 0.5
    rating_weight: float = 0.5
    
    # Evaluation
    eval_topk: List[int] = None
    
    # System
    mixed_precision: bool = True
    distributed_strategy: str = "mirrored"  # "mirrored", "multi_worker", "tpu"
    
    def __post_init__(self):
        if self.user_tower_dims is None:
            self.user_tower_dims = [512, 256, 128]
        if self.item_tower_dims is None:
            self.item_tower_dims = [512, 256, 128]
        if self.dnn_dims is None:
            self.dnn_dims = [512, 256, 128, 64]
        if self.eval_topk is None:
            self.eval_topk = [1, 5, 10, 20, 50, 100]

In [3]:
class DataProcessor:
    def __init__(self, config: ModelConfig):
        self.config = config
        self.user_encoder = None
        self.item_encoder = None
        self.feature_scalers = {}

    def load_and_validate_data(self, pickle_path: str) -> Dict[str, any]:
        logger.info(f"Loading data from {pickle_path}")

        try:
            with open(pickle_path, 'rb') as f:
                data = pickle.load(f)
        except:
            try:
                data = pd.read_pickle(pickle_path)
            except Exception as e:
                logger.error(f"Error loading data from {pickle_path}: {e}")
                raise
        
        if isinstance(data, dict):
            train_df = data.get('train_ratings', data.get('train', pd.DataFrame()))
            val_df = data.get('val_ratings', data.get('val', pd.DataFrame()))
            test_df = data.get('test_ratings', data.get('test', pd.DataFrame()))
            user_features = data.get('user_features', pd.DataFrame())
            item_features = data.get('movie_features', pd.DataFrame())
        else:
            train_df = data
            val_df = pd.DataFrame()
            test_df = pd.DataFrame()
            user_features = pd.DataFrame()
            item_features = pd.DataFrame()
        
        return {
            'train_df': train_df,
            'val_df': val_df,
            'test_df': test_df,
            'user_features': user_features,
            'item_features': item_features,
        }

    def engineer_features(self, df: pd.DataFrame, user_features: pd.DataFrame, 
                         item_features: pd.DataFrame, mode: str = 'train') -> pd.DataFrame:
        """Advanced Feature Engineering"""
        logger.info(f"Engineering features for {mode} data")

        #Ensure required columns exist
        required_cols = ['user_id', 'movie_id']
        for col in required_cols:
            if col not in df.columns:
                alt_names = {
                    'user_id': ['user', 'userId', 'user_idx'],
                    'movie_id': ['movie', 'movieId', 'movie_idx', 'item_id'],
                }
                found = False
                for alt_name in alt_names.get(col, []):
                    if alt_name in df.columns:
                        df = df.rename(columns={alt_name: col})
                        found = True
                        break
                    if not found:
                        raise ValueError(f"Required column {col} not found in dataframe")
        
        df['user_id'] = df['user_id'].astype(str)
        df['movie_id'] = df['movie_id'].astype(str)

        if 'timestamp' in df.columns:
            df['hour'] = df['timestamp'].dt.hour
            df['day_of_week'] = df['timestamp'].dt.dayofweek
            df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

        #User behavioral features
        user_stats = df.groupby('user_id').agg({
            'rating': ['count', 'mean', 'std'] if 'rating' in df.columns else ['count'],
            'movie_id': 'nunique'
        }).fillna(0)

        user_stats.columns = ['user_rating_count', 'user_avg_rating', 'user_rating_std', 'user_unique_items']
        if 'rating' not in df.columns:
            user_stats = user_stats[['user_rating_count', 'user_unique_items']]

        #Item popularity features
        item_stats = df.groupby('movie_id').agg({
            'rating': ['count', 'mean', 'std'] if 'rating' in df.columns else ['count'],
            'user_id': 'nunique'
        }).fillna(0)

        item_stats.columns = ['movie_rating_count', 'movie_avg_rating', 'movie_rating_std', 'movie_unique_users']
        if 'rating' not in df.columns:
            item_stats = item_stats[['movie_rating_count', 'movie_unique_users']]
        
        #Merge features
        df = df.merge(user_stats, left_on='user_id', right_index = True, how='left')
        df = df.merge(item_stats, left_on='movie_id', right_index=True, how='left')

        #Merge external features
        if not user_features.empty:
            user_key = user_features.columns[0]
            df = df.merge(user_features, left_on='user_id', right_on=user_key, how='left', suffixes=('', '_user'))
        
        if not item_features.empty:
            item_key = item_features.columns[0]
            df = df.merge(user_features, left_on='movie_id', right_on=item_key, how='left', suffixex=('', '_item'))
        
        #Scale Numerical Features
        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        id_cols = ['user_id', 'movie_id', 'rating', 'y_implicit', 'timestamp']
        numeric_cols = [col for col in numeric_cols if col not in id_cols]

        if numeric_cols:
            if mode == 'train':
                self.feature_scalers['numeric'] = StandardScaler()
                df[numeric_cols] = self.feature_scalers['numeric'].fit_transform(df[numeric_cols])
            else:
                if 'numeric' in self.feature_scalers:
                    df[numeric_cols] = self.feature_scalers['numeric'].transform(df[numeric_cols])
            
        return df.fillna(0)

In [4]:
class NegativeSampler:
    """Advanced negative sampling strategies"""

    def __init__(self, strategy: str='mixed', num_hard: int=5, num_random:int=100):
        self.strategy = strategy
        self.num_hard = num_hard
        self.num_random = num_random
        self.item_popularity = None
        self.user_item_matrix = None
    
    def fit(self, train_df:pd.DataFrame):
        """Fit the negative sampler on training data"""
        logger.info("Fitting negative sampler...")

        self.item_popularity = train_df.groupby('movie_id').size().to_dict()
        user_items = train_df.groupby('user_id')['movie_id'].apply(set).to_dict()
        self.user_item_matrix = user_items

        self.all_items = set(train_df['movie_id'].unique())
        self.all_items = set(train_df['movie_id'].unique())
    
    def sample_negatives(self, user_id: str, positive_items: List[str], 
        item_embeddings: Optional[np.ndarray] = None) -> List[str]:
        """Sample negative items for a user"""
        if user_id not in self.user_item_matrix:
            return list(np.random.choice(list(self.all_items),
            size=min(self.num_random, len(self.all_items)), replace=False))
        
        #Get candidate negatives
        user_positive_items = self.user_item_matrix[user_id]
        candidate_negatives = self.all_items - user_positive_items

        if len(candidate_negatives) == 0:
            return []
        
        negatives = []
        if self.strategy in ['random', 'mixed']:
            #Random sampling
            n_random = self.num_random if self.strategy == 'random' else self.num_random // 2
            random_negs = np.random.choice(list(candidate_negatives), size=min(n_random, len(candidate_negatives)), replace=False)
            negatives.extend(random_negs)
        
        if self.strategy in ['hard', 'mixed']:
            #Hard sampling
            n_hard = self.num_hard if self.strategy == 'hard' else self.num_hard // 2
            popularity_scores = [(item, self.item_popularity.get(item, 0)) for item in candidate_negatives]
            popularity_scores.sort(key=lambda x: x[1], reverse=True)
            hard_negs = [item for item, _ in popularity_scores[:n_hard]]
            negatives.extend(hard_negs)
        
        return negatives
        

In [5]:
class AdvancedMetrics:
    """Comprehensive evaluation metrics for recommendation systems"""

    @staticmethod
    def dcg_at_k(relevance_scores: List[float], k: int) -> float:
        relevance_scores = relevance_scores[:k]
        dcg = 0
        for i, rel in enumerate(relevance_scores):
            dcg += (2**rel-1)/np.log2(i+2)
        
        return dcg
    
    @staticmethod
    def ndcg_at_k(true_relevance: List[float], pred_relevance: List[float], k: int) -> float:
        """Normalized Discounted Cumulative Gain at k"""

        dcg = AdvancedMetrics.dcg_at_k(pred_relevance, k)
        idcg = AdvancedMetrics.dcg_at_k(sorted(true_relevance, reverse=True), k)

        return dcg / idcg if idcg > 0 else 0.0

    @staticmethod
    def map_at_k(y_true: List[List[str]], y_pred: List[List[str]], k: int) -> float:
        """Mean Average Precision at k"""
        average_precisions = []
        for true_items, pred_items in zip(y_true, y_pred):
            true_set = set(true_items)
            pred_k = pred_items[:k]

            if not true_set:
                continue

            precisions = []
            relevant_count = 0
            for i, item in enumerate(pred_k):
                if item in true_set:
                    relevant_count += 1
                    precisions.append(relevant_count/(i+1))
            
            if precisions:
                average_precisions.append(sum(precisions)/len(precisions))
            else:
                average_precisions.append(0.0)
            
        return np.mean(average_precisions) if average_precisions else 0.0
    
    @staticmethod
    def mrr(y_true: List[List[str]], y_pred: List[List[str]]) -> float:
        """Mean Reciprocal Rank"""
        reciprocal_ranks = []
        for true_items, pred_items in zip(y_true, y_pred):
            true_set = set(true_items)
            for i, item in enumerate(pred_items, 1):
                if item in true_set:
                    reciprocal_ranks.append(1/i)
                    break
            else:
                reciprocal_ranks.append(0.0)
            
        return np.mean(reciprocal_ranks)
    
    @staticmethod
    def coverage(recommendations: List[List[str]], all_items: List[str]) -> float:
        """Item Coverage - fraction of items that appear in recommendations"""

        recommended_items = set()
        for rec_list in recommendations:
            recommended_items.update(rec_list)
        
        return len(recommended_items)/len(all_items)

    @staticmethod
    def diversity(recommendations: List[List[str]], item_features: Optional[pd.DataFrame] = None) -> float:
        """Intra-list diversity (average pairwise distance within recommendation lists)"""
        if item_features is None:
            #Simple diversity based on unique items
            diversities = []
            for rec_list in recommendations:
                if len(rec_list) <= 1:
                    diversities.append(0.0)
                else:
                    unique_items = len(set(rec_list))
                    diversities.append(unique_items/len(rec_list))
            
            return np.mean(diversities)
        else:
            return 0.0

In [6]:
class DeepCrossNetwork(keras.Model):
    """Deep and Cross Network for feature interactions"""

    def __init__(self, cross_layers: int=3, deep_layers: List[int] = [512, 256, 128], dropout_rate: float = 0.2,
        l2_reg: float = 1e-5):
        super().__init__()
        self.cross_layers = cross_layers
        self.deep_layers = deep_layers
        self.dropout_rate = dropout_rate
        self.l2_reg = l2_reg

        #Cross Layers
        self.cross_weights = []
        self.cross_biases = []

        #Deep layers
        self.deep_nets = []
        for units in deep_layers:
            self.deep_nets.append(tf.keras.layers.Dense(
                units, activation='relu',
                kernel_regularizer=tf.keras.regularizers.l2(l2_reg)
            ))
            self.deep_nets.append(tf.keras.layers.Dropout(dropout_rate))
    
    def build(self, input_shape):
        super().build(input_shape)
        input_dim = input_shape[-1]

        #Initialize cross layer weights
        for i in range(self.cross_layers):
            self.cross_weights.append(
                self.add_weight(
                    name = f'cross_weight_{i}',
                    shape = (input_dim, 1),
                    initializer = 'truncated_normal',
                    regularizer = tf.keras.regularizers.l2(self.l2_reg)
                )
            )
            self.cross_biases.append(
                self.add_weight(
                    name=f'cross_bias_{i}',
                    shape = (input_dim,),
                    initializer = 'zeros'
                )
            )
        
    def call(self, inputs, training=None):
        #Cross Network
        x0 = inputs
        xl = x0

        for i in range(self.cross_layers):
            xl_w = tf.matmul(xl, self.cross_weights)
            xl = x0 * xl_w + self.cross_biases + xl
            
        cross_output = xl

        # Deep Network
        deep_inputs = inputs
        for layer in self.deep_nets:
            deep_output = layer(deep_output, training=training)

        combined = tf.concat([cross_output, deep_output], axis=1)
        return combined

In [7]:
class MultiTowerModel(keras.Model):
    """Multi-tower model architecture for retrieval"""

    def __init__(self, config: ModelConfig, user_vocab: List[str],
        item_vocab: List[str], feature_specs: Dict[str, Any]):
        super().__init__()
        self.config = config
        self.user_vocab = user_vocab
        self.item_vocab = item_vocab
        self.feature_specs = feature_specs

        #User Tower
        self.user_lookup = keras.layers.StringLookup(vocabulary=user_vocab,
                mask_token=None)
        self.user_embedding = keras.layers.Embedding(
            len(user_vocab)+1, config.embedding_dim,
            embeddings_regularizer=keras.regularizers.l2(config.l2_reg)
        )
        
        #Item Tower
        self.item_lookup = keras.layers.StringLookup(vocabulary=item_vocab,
                mask_token=None)
        self.item_embedding = keras.layers.Embedding(
            len(item_vocab)+1, config.embedding_dim,
            embeddings_regularizer=keras.regularizers.l2(config.l2_reg)
        )

        #Feature Processing Layers
        self.feature_layers = {}
        for feature_name, feature_info in feature_specs.items():
            if feature_info['type'] == 'categorical':
                self.feature_layers[feature_name] = keras.layers.Embedding(
                    feature_info['vocab_size'], config.embedding_dim // 2
                )
            elif feature_info['type'] == 'numerical':
                self.feature_layers[feature_name] = keras.layers.Dense(
                    config.embedding_dim // 2, activation='relu'
                )
        
        # Tower Networks
        self.user_tower_layers = []
        for units in config.user_tower_dims:
            self.user_tower_layers.extend([
                keras.layers.Dense(units, activation='relu'),
                keras.layers.Dropout(config.dropout_rate)
            ])
        self.user_tower_layers.append(keras.layers.Dense(config.embedding_dim))

        self.item_tower_layers = []
        for units in config.item_tower_dims:
            self.item_tower_layers.extend([
                keras.layers.Dense(units, activation='relu'),
                keras.layers.Dropout(config.dropout_rate)
            ])
        self.item_tower_layers.append(keras.layers.Dense(config.embedding_dim))

        def call(self, features, training=None):
            # User Tower
            user_emb = self.user_embedding(self.user_lookup(features['user_id']))
            user_emb = tf.squeeze(user_emb, axis=1)

            # Process user features
            user_feature_embs = [user_emb]
            for feature_name, layer in self.feature_layers.items():
                if(feature_name.startswith('user_') and feature_name in features):
                    feat_emb = layer(features[feature_name])
                    if len(feat_emb.shape) > 2:
                        feat_emb = tf.squeeze(feat_emb, axis=1)
                    user_feature_embs.append(feat_emb)
            
            user_combined = tf.concat(user_feature_embs, axis=1)

            for layer in self.user_tower_layers:
                user_combined = layer(user_combined, training=training)

            # Item tower
            item_emb = self.item_embedding(self.item_lookup(features['movie_id']))
            item_emb = tf.squeeze(item_emb, axis=1)
            
            # Process item features
            item_feature_embs = [item_emb]
            for feature_name, layer in self.feature_layers.items():
                if feature_name.startswith('movie_') and feature_name in features:
                    feat_emb = layer(features[feature_name])
                    if len(feat_emb.shape) > 2:
                        feat_emb = tf.squeeze(feat_emb, axis=1)
                    item_feature_embs.append(feat_emb)
            
            item_combined = tf.concat(item_feature_embs, axis=1)
            
            for layer in self.item_tower_layers:
                item_combined = layer(item_combined, training=training)
            
            return {
                'user_embedding': user_combined,
                'item_embedding': item_combined
            }   

In [None]:
class MultiTaskModel(tfrs.models.Model):
    def __init__(self, config: ModelConfig, user_vocab: List[str],
        item_vocab: List[str], feature_specs: Dict[str, Any]):
        super().__init__()
        self.config = config
        self.encoder = MultiTowerModel(config, user_vocab, item_vocab, feature_specs)
        