# Hackathon - The Next Purchasek
### Recommendation System
<hr style=" border:none; height:3px;">

## Stage 1: Multi-Channel Recall

Retrieval employs multiple parallel strategies—collaborative filtering, embedding-based matching, sequence modeling, and popularity-based methods—to rapidly surface thousands of candidates from millions of items, balancing precision with diversity by leveraging different signals of user-item affinity.

### 1) Embedding-based Recall (Two-Tower Model)

The two-tower architecture separately encodes users and items into dense vector representations within a shared embedding space, enabling efficient similarity matching through approximate nearest neighbor search to identify items whose learned embeddings exhibit high affinity with the user's latent preference vector.

user features + item features → separate neural networks → user vector & item vectors → cosine similarity in shared space → retrieve top-K nearest items → candidates matching learned preference patterns.

In [1]:
from google.colab import drive
drive.mount('/content/drive')
%cd "/content/drive/MyDrive/Hackathon/"

Mounted at /content/drive
/content/drive/MyDrive/Hackathon


In [3]:
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

True

In [7]:
import numpy as np
import torch
import torch.nn as nn
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Dict, Tuple
import psycopg2
from psycopg2.extras import execute_values
import os

class TwoTowerRecall:

    def __init__(self, user_feature_dim: int, item_feature_dim: int, embedding_dim: int = 64):
        """
        Initialize two-tower model.
        Args:
            user_feature_dim: Dimension of user features
            item_feature_dim: Dimension of item features
            embedding_dim: Output embedding dimension (default: 64)
        """

        self.embedding_dim = embedding_dim
        self.user_tower = self._build_tower(user_feature_dim, embedding_dim)
        self.item_tower = self._build_tower(item_feature_dim, embedding_dim)
        self.optimizer = torch.optim.Adam(
            list(self.user_tower.parameters()) + list(self.item_tower.parameters()),
            lr=0.001
        )

    def _build_tower(self, input_dim: int, output_dim: int) -> nn.Module:
        """
        Build tower network.
        Args:
            input_dim: Input feature dimension
            output_dim: Output embedding dimension
        Returns:
            Neural network tower
        """

        return nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, user_features: torch.Tensor, item_features: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass through both towers.
        Args:
            user_features: User feature tensor [batch_size, user_feature_dim]
            item_features: Item feature tensor [batch_size, item_feature_dim]
        Returns:
            Tuple of (user_embeddings, item_embeddings), both L2-normalized
        """

        user_emb = self.user_tower(user_features)
        item_emb = self.item_tower(item_features)

        # L2 normalization
        user_emb = torch.nn.functional.normalize(user_emb, p=2, dim=1)
        item_emb = torch.nn.functional.normalize(item_emb, p=2, dim=1)

        return user_emb, item_emb

    def train_step(self, user_features: torch.Tensor, item_features: torch.Tensor, labels: torch.Tensor) -> float:
        """
        Single training step.
        Args:
            user_features: User features [batch_size, user_feature_dim]
            item_features: Item features [batch_size, item_feature_dim]
            labels: Binary labels [batch_size] (1 for positive, 0 for negative)
        Returns:
            Loss value
        """

        self.user_tower.train()
        self.item_tower.train()
        self.optimizer.zero_grad()

        user_emb, item_emb = self.forward(user_features, item_features)

        # Compute similarity scores
        scores = (user_emb * item_emb).sum(dim=1)

        # Binary cross-entropy loss
        loss = nn.BCEWithLogitsLoss()(scores, labels.float())

        loss.backward()
        self.optimizer.step()

        return loss.item()

    def encode_users(self, user_features: torch.Tensor) -> np.ndarray:
        """
        Encode users into embeddings.
        Args:
            user_features: User features [num_users, user_feature_dim]
        Returns:
            User embeddings [num_users, embedding_dim]
        """

        self.user_tower.eval()
        with torch.no_grad():
            user_emb = self.user_tower(user_features)
            user_emb = torch.nn.functional.normalize(user_emb, p=2, dim=1)
        self.user_tower.train()
        return user_emb.cpu().numpy()

    def encode_items(self, item_features: torch.Tensor) -> np.ndarray:
        """
        Encode items into embeddings.
        Args:
            item_features: Item features [num_items, item_feature_dim]
        Returns:
            Item embeddings [num_items, embedding_dim]
        """

        self.item_tower.eval()
        with torch.no_grad():
            item_emb = self.item_tower(item_features)
            item_emb = torch.nn.functional.normalize(item_emb, p=2, dim=1)
        self.item_tower.train()
        return item_emb.cpu().numpy()

    def recall(self, user_embedding: np.ndarray, item_embeddings: np.ndarray, top_k: int = 500) -> List[int]:
        """
        Recall top-K items for user.
        Args:
            user_embedding: User embedding vector [embedding_dim]
            item_embeddings: All item embeddings [num_items, embedding_dim]
            top_k: Number of items to recall
        Returns:
            List of top-K item indices
        """

        similarities = np.dot(item_embeddings, user_embedding)
        top_k_indices = np.argsort(similarities)[-top_k:][::-1]
        return top_k_indices.tolist()

In [13]:
from torch.utils.data import TensorDataset, DataLoader

# 0. Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 1. Load training data from database
try:
    with psycopg2.connect(
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        dbname=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        sslmode='require'
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute("""
            SELECT "ClientID", "ProductID"
            FROM user_item_interactions
            WHERE "PurchaseCount" > 0
            """)
            positive_samples = cursor.fetchall()

            cursor.execute("""
            SELECT "ClientID", "TotalPurchases", "TotalSpendEuro", "AvgOrderValue",
                   "DaysSinceLastPurchase", "PurchaseFrequency", "UniqueProductsBought",
                   "TotalQuantity", "Age"
            FROM clients
            """)
            user_features_data = cursor.fetchall()

            cursor.execute("""
            SELECT "ProductID", "TotalSales", "TotalQuantitySold", "Sales7d",
                   "Sales30d", "AvgPrice", "TotalRevenue", "UniqueBuyers",
                   "AvgQuantityPerOrder", "TotalStockQuantity", "StockCountries"
            FROM products
            """)
            item_features_data = cursor.fetchall()
            print("Training data loaded successfully!")
except Exception as e:
    print(f"Error loading training data: {e}")
    raise

# 2. Prepare features
user_feature_dict = {uid: [0 if x is None else float(x) for x in features]
                     for uid, *features in user_features_data}
item_feature_dict = {iid: [0 if x is None else float(x) for x in features]
                     for iid, *features in item_features_data}

user_feature_dim = len(user_features_data[0]) - 1
item_feature_dim = len(item_features_data[0]) - 1

print(f"Total positive samples: {len(positive_samples)}")

# 3. Generate negative samples
print("Generating negative samples...")
all_item_ids = np.array(list(item_feature_dict.keys()))
positive_set = set(positive_samples)
training_samples = []

# Pre-generate 10 candidate negative samples for each positive sample
neg_candidates = np.random.choice(all_item_ids, size=(len(positive_samples), 10), replace=True)

for idx, (user_id, item_id) in enumerate(positive_samples):
    if user_id not in user_feature_dict or item_id not in item_feature_dict:
        continue

    training_samples.append((user_id, item_id, 1))

    # Find first candidate not in positive_set
    for neg_item in neg_candidates[idx]:
        if (user_id, neg_item) not in positive_set:
            training_samples.append((user_id, neg_item, 0))
            break

print(f"Total training samples: {len(training_samples)}")

# 4. Prepare tensors
user_tensor_all = torch.FloatTensor([user_feature_dict[u] for u, _, _ in training_samples])
item_tensor_all = torch.FloatTensor([item_feature_dict[i] for _, i, _ in training_samples])
label_tensor_all = torch.FloatTensor([l for _, _, l in training_samples])

dataset = TensorDataset(user_tensor_all, item_tensor_all, label_tensor_all)
loader = DataLoader(
    dataset,
    batch_size=256,
    shuffle=True
)

# 5. Initialize model
model = TwoTowerRecall(user_feature_dim, item_feature_dim, embedding_dim=64)
model.user_tower = model.user_tower.to(device)
model.item_tower = model.item_tower.to(device)

# 6. Training loop
epochs = 50
patience = 5
best_loss = float('inf')
patience_counter = 0

for epoch in range(epochs):
    epoch_loss = 0
    num_batches = 0

    for batch_idx, (user_tensor, item_tensor, labels_tensor) in enumerate(loader):
        user_tensor = user_tensor.to(device)
        item_tensor = item_tensor.to(device)
        labels_tensor = labels_tensor.to(device)

        loss = model.train_step(user_tensor, item_tensor, labels_tensor)
        epoch_loss += loss
        num_batches += 1

        if batch_idx % 100 == 0:
            print(f"Epoch {epoch + 1}/{epochs}, Batch {batch_idx}/{len(loader)}")

    avg_loss = epoch_loss / num_batches
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")

    # Early stopping
    if avg_loss < best_loss:
        best_loss = avg_loss
        patience_counter = 0
        # Save best model
        torch.save({
            'user_tower': model.user_tower.state_dict(),
            'item_tower': model.item_tower.state_dict()
        }, 'two_tower_model_best.pth')
        print(f"Best model saved with loss: {best_loss:.4f}")
    else:
        patience_counter += 1
        print(f"No improvement for {patience_counter} epoch(s)")

        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs")
            break

# 7. Load best model
checkpoint = torch.load('two_tower_model_best.pth')
model.user_tower.load_state_dict(checkpoint['user_tower'])
model.item_tower.load_state_dict(checkpoint['item_tower'])
print("Best model loaded!")

# 8. Save final model
torch.save({
    'user_tower': model.user_tower.state_dict(),
    'item_tower': model.item_tower.state_dict()
}, 'two_tower_model.pth')
print("Model saved successfully!")

# 9. Generate embeddings
all_user_ids = list(user_feature_dict.keys())
all_item_ids = list(item_feature_dict.keys())

user_features_tensor = torch.FloatTensor([user_feature_dict[uid] for uid in all_user_ids]).to(device)
item_features_tensor = torch.FloatTensor([item_feature_dict[iid] for iid in all_item_ids]).to(device)

user_embeddings = model.encode_users(user_features_tensor)
item_embeddings = model.encode_items(item_features_tensor)

# 10. Save embeddings to database
try:
    with psycopg2.connect(
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        dbname=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        sslmode='require'
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute("TRUNCATE TABLE user_embeddings")
            cursor.execute("TRUNCATE TABLE item_embeddings")

            user_data = [(int(uid), user_embeddings[i].tolist(), 'NOW()')
                        for i, uid in enumerate(all_user_ids)]
            execute_values(cursor, """
            INSERT INTO user_embeddings ("ClientID", "Embedding", "UpdatedAt")
            VALUES %s
            """, user_data)

            item_data = [(int(iid), item_embeddings[i].tolist(), 'NOW()')
                        for i, iid in enumerate(all_item_ids)]
            execute_values(cursor, """
            INSERT INTO item_embeddings ("ProductID", "Embedding", "UpdatedAt")
            VALUES %s
            """, item_data)

            conn.commit()
            print("Embeddings saved successfully!")
except Exception as e:
    print(f"Error saving embeddings: {e}")

Using device: cuda
Training data loaded successfully!
Total positive samples: 976274
Generating negative samples...
Total training samples: 1952548
Epoch 1/50, Batch 0/7628
Epoch 1/50, Batch 100/7628
Epoch 1/50, Batch 200/7628
Epoch 1/50, Batch 300/7628
Epoch 1/50, Batch 400/7628
Epoch 1/50, Batch 500/7628
Epoch 1/50, Batch 600/7628
Epoch 1/50, Batch 700/7628
Epoch 1/50, Batch 800/7628
Epoch 1/50, Batch 900/7628
Epoch 1/50, Batch 1000/7628
Epoch 1/50, Batch 1100/7628
Epoch 1/50, Batch 1200/7628
Epoch 1/50, Batch 1300/7628
Epoch 1/50, Batch 1400/7628
Epoch 1/50, Batch 1500/7628
Epoch 1/50, Batch 1600/7628
Epoch 1/50, Batch 1700/7628
Epoch 1/50, Batch 1800/7628
Epoch 1/50, Batch 1900/7628
Epoch 1/50, Batch 2000/7628
Epoch 1/50, Batch 2100/7628
Epoch 1/50, Batch 2200/7628
Epoch 1/50, Batch 2300/7628
Epoch 1/50, Batch 2400/7628
Epoch 1/50, Batch 2500/7628
Epoch 1/50, Batch 2600/7628
Epoch 1/50, Batch 2700/7628
Epoch 1/50, Batch 2800/7628
Epoch 1/50, Batch 2900/7628
Epoch 1/50, Batch 3000/7

In [55]:
# TwoTowerRecall Example
# 1. Load model
model = TwoTowerRecall(user_feature_dim=8, item_feature_dim=10, embedding_dim=64)
checkpoint = torch.load('two_tower_model.pth')
model.user_tower.load_state_dict(checkpoint['user_tower'])
model.item_tower.load_state_dict(checkpoint['item_tower'])
model.user_tower = model.user_tower.to(device)
model.item_tower = model.item_tower.to(device)

# 2. Load embeddings from database
try:
    with psycopg2.connect(
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        dbname=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        sslmode='require'
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute("""
            SELECT "ProductID", "Embedding"
            FROM item_embeddings
            """)
            item_data = cursor.fetchall()

            item_id_to_idx = {pid: i for i, (pid, _) in enumerate(item_data)}
            item_embeddings = np.array([emb for _, emb in item_data])
            print("Item embeddings loaded successfully!")
except Exception as e:
    print(f"Error loading embeddings: {e}")

# 3. Recall (example)
# Example: user_id=12345 with features
user_id = 2820558652430377474
user_features = user_feature_dict[user_id]
user_tensor = torch.FloatTensor([user_features]).to(device)
user_embedding = model.encode_users(user_tensor)[0]

recommendations_idx = model.recall(user_embedding, item_embeddings, top_k=500)
recommendations = [list(item_id_to_idx.keys())[idx] for idx in recommendations_idx]
print(f"Recalled {len(recommendations)} candidates from TwoTower")
print(f"Top 10 candidates: {recommendations[:10]}")

Item embeddings loaded successfully!
Recalled 500 candidates from TwoTower
Top 10 candidates: [1870088131203853896, 3393324386710413510, 4104639588574737041, 7255470514920952691, 5832182010792345789, 1677626284184941631, 4720422053137086922, 1025562429060199077, 56020583862746888, 280451233911325994]


### 2) Collaborative Filtering

- #### Item-based Collaborative Filtering (ItemCF)
  ItemCF computes item-to-item similarity by analyzing co-occurrence patterns in user purchase histories—items frequently bought by the same users are deemed similar—then recommends items similar to those the user has previously interacted with, capturing product relationships without requiring user profiling.

  user purchase history → identify purchased items → compute item-item similarity (via co-purchase patterns) → retrieve items similar to user's history → recommend related products.

In [63]:
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity

class ItemCF:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize ItemCF recommender.
        Args:
            db_config: Database connection configuration dictionary
        """

        self.db_config = db_config
        self.item_similarity = {}

    def compute_similarity(self, min_common_users: int = 5, top_k: int = 100):
        """
        Compute item-item similarity matrix using vectorized operations.
        Args:
            min_common_users: Minimum number of common users required for similarity calculation
            top_k: Keep top-K most similar items for each item
        """

        # Load user-item interactions from database
        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ClientID", "ProductID", "PurchaseCount"
                    FROM user_item_interactions
                    """)
                    interactions = cursor.fetchall()
        except Exception as e:
            print(f"Error loading user-item interactions: {e}")
            return

        # Build sparse user-item matrix
        user_id_map = {}
        item_id_map = {}
        rows, cols, data = [], [], []

        for client_id, product_id, count in interactions:
            if client_id not in user_id_map:
                user_id_map[client_id] = len(user_id_map)
            if product_id not in item_id_map:
                item_id_map[product_id] = len(item_id_map)

            rows.append(user_id_map[client_id])
            cols.append(item_id_map[product_id])
            data.append(count)

        user_item_matrix = csr_matrix(
            (data, (rows, cols)),
            shape=(len(user_id_map), len(item_id_map))
        )

        print(f"Matrix shape: {user_item_matrix.shape}")
        print(f"Sparsity: {1 - user_item_matrix.nnz / (user_item_matrix.shape[0] * user_item_matrix.shape[1]):.4f}")

        # Compute item-item cosine similarity
        item_item_sim = cosine_similarity(user_item_matrix.T, dense_output=False)

        # Compute co-occurrence count
        user_item_binary = (user_item_matrix > 0).astype(np.float32)
        co_occurrence = user_item_binary.T.dot(user_item_binary)

        # Convert to CSR format
        item_item_sim = item_item_sim.tocsr()
        co_occurrence = co_occurrence.tocsr()

        # Build reverse mapping
        item_id_reverse = {idx: pid for pid, idx in item_id_map.items()}

        # Extract top-K similar items for each item
        for item_idx in range(len(item_id_map)):
            item_a = item_id_reverse[item_idx]

            # Get non-zero similarity indices
            sim_row = item_item_sim[item_idx]
            nonzero_cols = sim_row.nonzero()[1]

            # Filter out self-similarity
            nonzero_cols = nonzero_cols[nonzero_cols != item_idx]

            if len(nonzero_cols) == 0:
                self.item_similarity[item_a] = []
                continue

            # Get similarity values
            sim_values = sim_row[0, nonzero_cols].toarray().ravel()

            # Get co-occurrence counts
            cooc_values = co_occurrence[item_idx, nonzero_cols].toarray().ravel()

            # Filter by minimum common users
            valid_mask = cooc_values >= min_common_users
            valid_indices = nonzero_cols[valid_mask]
            valid_similarities = sim_values[valid_mask]

            if len(valid_indices) == 0:
                self.item_similarity[item_a] = []
                continue

            # Select top-K candidates
            if len(valid_indices) <= top_k:
                top_local_indices = np.arange(len(valid_indices))
            else:
                top_local_indices = np.argpartition(valid_similarities, -top_k)[-top_k:]

            # Sort top-K by similarity score
            top_local_indices = top_local_indices[np.argsort(valid_similarities[top_local_indices])[::-1]]

            # Build similarity list
            similarities = [
                (item_id_reverse[valid_indices[i]], float(valid_similarities[i]))
                for i in top_local_indices
            ]

            self.item_similarity[item_a] = similarities

            if (item_idx + 1) % 1000 == 0:
                print(f"Processed {item_idx + 1}/{len(item_id_map)} items")

    def save_to_db(self):
        """
        Save computed item similarity to database.
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    # Clear existing similarity data
                    cursor.execute("TRUNCATE TABLE item_similarity")

                    # Prepare batch insert data
                    data = []
                    for item_a, similar_items in self.item_similarity.items():
                        for item_b, similarity in similar_items:
                            data.append((item_a, item_b, similarity, len(similar_items)))

                    # Batch insert similarity records
                    execute_values(cursor, """
                    INSERT INTO item_similarity ("ProductID_A", "ProductID_B", "Similarity", "CoOccurrenceCount")
                    VALUES %s
                    """, data)

                    conn.commit()
                    print("Item similarity saved successfully!")
        except Exception as e:
            print(f"Error saving item similarity: {e}")

    def load_from_db(self):
        """
        Load item similarity from database.
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ProductID_A", "ProductID_B", "Similarity"
                    FROM item_similarity
                    ORDER BY "ProductID_A", "Similarity" DESC
                    """)

                    rows = cursor.fetchall()

                    for item_a, item_b, similarity in rows:
                        if item_a not in self.item_similarity:
                            self.item_similarity[item_a] = []
                        self.item_similarity[item_a].append((item_b, float(similarity)))

                    print(f"Item similarity loaded successfully! Total items: {len(self.item_similarity)}")
        except Exception as e:
            print(f"Error loading item similarity: {e}")

    def recall(self, user_purchased_items: List[int], top_k: int = 300) -> List[int]:
        """
        Recall candidate items based on user purchase history.
        Args:
            user_purchased_items: List of product IDs user has purchased
            top_k: Number of candidate items to recall
        Returns:
            List of recommended product IDs ranked by aggregated similarity score
        """

        # Aggregate similarity scores from purchased items
        candidate_scores = {}

        for item_id in user_purchased_items:
            if item_id not in self.item_similarity:
                continue

            for similar_item, similarity in self.item_similarity[item_id]:
                # Skip items user already purchased
                if similar_item in user_purchased_items:
                    continue

                # Accumulate similarity scores
                if similar_item not in candidate_scores:
                    candidate_scores[similar_item] = 0
                candidate_scores[similar_item] += similarity

        # Sort candidates by score
        candidates = sorted(candidate_scores.items(), key=lambda x: x[1], reverse=True)
        return [item_id for item_id, score in candidates[:top_k]]

In [35]:
# ItemCF Computation Process
# 1. Initialize
db_config = {
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'dbname': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'sslmode': 'require'
}

item_cf = ItemCF(db_config)

# 2. Compute similarity
item_cf.compute_similarity(min_common_users=5, top_k=100)

# 3. Save to database
item_cf.save_to_db()

Matrix shape: (304929, 29730)
Sparsity: 0.9999
Processed 1000/29730 items
Processed 2000/29730 items
Processed 3000/29730 items
Processed 4000/29730 items
Processed 7000/29730 items
Processed 9000/29730 items
Processed 17000/29730 items
Item similarity saved successfully!


In [56]:
# 4. Recall (example)
# Example: user purchased items [1194042316259024227, 910783516894460725, 733557522770138156]
user_purchased = [1194042316259024227, 910783516894460725, 733557522770138156]
recommendations = item_cf.recall(user_purchased, top_k=300)
print(f"Recalled {len(recommendations)} candidates from ItemCF")
print(f"Top 10 candidates: {recommendations[:10]}")

Recalled 85 candidates from ItemCF
Top 10 candidates: [6306151556054509590, 1233079729704511877, 1002319356880102587, 4735736287986553092, 1518309762619768287, 4872929861894050765, 2475752611632244287, 3625979375121276682, 6093539354933854031, 6448349970795474714]


- #### User-based Collaborative Filtering (UserCF)
  UserCF identifies users with similar behavioral patterns by comparing their purchase histories, then recommends items purchased by these similar users but not yet acquired by the target user, effectively leveraging collective intelligence from user communities with closely aligned preferences.

  target user's purchase history → find similar users (via overlapping purchases) → aggregate items bought by similar users → filter out already-purchased items → recommend from similar users' preferences.

In [64]:
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.metrics.pairwise import cosine_similarity

class UserCF:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize UserCF recommender.
        Args:
            db_config: Database connection configuration dictionary
        """

        self.db_config = db_config
        self.user_similarity = {}

    def compute_similarity(self, min_common_items: int = 3, top_k: int = 50):
        """
        Compute user-user similarity matrix using vectorized operations.
        Args:
            min_common_items: Minimum number of common items required for similarity calculation
            top_k: Keep top-K most similar users for each user
        """

        # Load user-item interactions from database
        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ClientID", "ProductID", "PurchaseCount"
                    FROM user_item_interactions
                    """)
                    interactions = cursor.fetchall()
        except Exception as e:
            print(f"Error loading user-item interactions: {e}")
            return

        # Build sparse user-item matrix
        user_id_map = {}
        item_id_map = {}
        rows, cols, data = [], [], []

        for client_id, product_id, count in interactions:
            if client_id not in user_id_map:
                user_id_map[client_id] = len(user_id_map)
            if product_id not in item_id_map:
                item_id_map[product_id] = len(item_id_map)

            rows.append(user_id_map[client_id])
            cols.append(item_id_map[product_id])
            data.append(count)

        user_item_matrix = csr_matrix(
            (data, (rows, cols)),
            shape=(len(user_id_map), len(item_id_map))
        )

        print(f"Matrix shape: {user_item_matrix.shape}")
        print(f"Sparsity: {1 - user_item_matrix.nnz / (user_item_matrix.shape[0] * user_item_matrix.shape[1]):.4f}")

        # Compute user-user cosine similarity
        user_user_sim = cosine_similarity(user_item_matrix, dense_output=False)

        # Compute co-occurrence count
        user_item_binary = (user_item_matrix > 0).astype(np.float32)
        co_occurrence = user_item_binary.dot(user_item_binary.T)

        # Convert to CSR format
        user_user_sim = user_user_sim.tocsr()
        co_occurrence = co_occurrence.tocsr()

        # Build reverse mapping
        user_id_reverse = {idx: uid for uid, idx in user_id_map.items()}

        # Extract top-K similar users for each user
        for user_idx in range(len(user_id_map)):
            user_a = user_id_reverse[user_idx]

            # Get non-zero similarity indices
            sim_row = user_user_sim[user_idx]
            nonzero_cols = sim_row.nonzero()[1]

            # Filter out self-similarity
            nonzero_cols = nonzero_cols[nonzero_cols != user_idx]

            if len(nonzero_cols) == 0:
                self.user_similarity[user_a] = []
                continue

            # Get similarity values
            sim_values = sim_row[0, nonzero_cols].toarray().ravel()

            # Get co-occurrence counts
            cooc_values = co_occurrence[user_idx, nonzero_cols].toarray().ravel()

            # Filter by minimum common items
            valid_mask = cooc_values >= min_common_items
            valid_indices = nonzero_cols[valid_mask]
            valid_similarities = sim_values[valid_mask]
            valid_common_items = cooc_values[valid_mask]

            if len(valid_indices) == 0:
                self.user_similarity[user_a] = []
                continue

            # Select top-K candidates
            if len(valid_indices) <= top_k:
                top_local_indices = np.arange(len(valid_indices))
            else:
                top_local_indices = np.argpartition(valid_similarities, -top_k)[-top_k:]

            # Sort top-K by similarity score
            top_local_indices = top_local_indices[np.argsort(valid_similarities[top_local_indices])[::-1]]

            # Build similarity list
            similarities = [
                (user_id_reverse[valid_indices[i]], float(valid_similarities[i]), int(valid_common_items[i]))
                for i in top_local_indices
            ]

            self.user_similarity[user_a] = similarities

            if (user_idx + 1) % 1000 == 0:
                print(f"Processed {user_idx + 1}/{len(user_id_map)} users")

    def save_to_db(self):
        """
        Save computed user similarity to database.
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    # Clear existing similarity data
                    cursor.execute("TRUNCATE TABLE user_similarity")

                    # Prepare batch insert data
                    data = []
                    for user_a, similar_users in self.user_similarity.items():
                        for user_b, similarity, common_items in similar_users:
                            data.append((user_a, user_b, similarity, common_items))

                    # Batch insert similarity records
                    execute_values(cursor, """
                    INSERT INTO user_similarity ("ClientID_A", "ClientID_B", "Similarity", "CommonProducts")
                    VALUES %s
                    """, data)

                    conn.commit()
                    print("User similarity saved successfully!")
        except Exception as e:
            print(f"Error saving user similarity: {e}")

    def load_from_db(self):
        """
        Load user similarity from database.
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ClientID_A", "ClientID_B", "Similarity", "CommonProducts"
                    FROM user_similarity
                    ORDER BY "ClientID_A", "Similarity" DESC
                    """)

                    rows = cursor.fetchall()

                    for user_a, user_b, similarity, common_products in rows:
                        if user_a not in self.user_similarity:
                            self.user_similarity[user_a] = []
                        self.user_similarity[user_a].append((user_b, float(similarity), int(common_products)))

                    print(f"User similarity loaded successfully! Total users: {len(self.user_similarity)}")
        except Exception as e:
            print(f"Error loading user similarity: {e}")

    def recall(self, user_id: int, user_purchased_items: List[int], top_k: int = 200) -> List[int]:
        """
        Recall candidate items based on similar users' purchases.
        Args:
            user_id: Target user ID
            user_purchased_items: List of item IDs target user has purchased
            top_k: Number of candidate items to recall
        Returns:
            List of recommended item IDs ranked by aggregated similarity score
        """

        if user_id not in self.user_similarity:
            return []

        # Load similar users' purchases from database
        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    similar_user_ids = [user_b for user_b, _, _ in self.user_similarity[user_id]]

                    cursor.execute("""
                    SELECT "ClientID", "ProductID", "PurchaseCount"
                    FROM user_item_interactions
                    WHERE "ClientID" = ANY(%s)
                    """, (similar_user_ids,))

                    interactions = cursor.fetchall()
        except Exception as e:
            print(f"Error loading similar users' purchases: {e}")
            return []

        # Aggregate similarity scores from similar users
        candidate_scores = {}
        user_sim_dict = {user_b: sim for user_b, sim, _ in self.user_similarity[user_id]}

        for similar_user, item_id, count in interactions:
            # Skip items user already purchased
            if item_id in user_purchased_items:
                continue

            # Accumulate similarity scores
            if item_id not in candidate_scores:
                candidate_scores[item_id] = 0
            candidate_scores[item_id] += user_sim_dict[similar_user]

        # Sort candidates by score
        candidates = sorted(candidate_scores.items(), key=lambda x: x[1], reverse=True)
        return [item_id for item_id, score in candidates[:top_k]]

In [37]:
# UserCF Computation Process
# 1. Initialize
db_config = {
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'dbname': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'sslmode': 'require'
}

user_cf = UserCF(db_config)

# 2. Compute similarity
user_cf.compute_similarity(min_common_items=3, top_k=50)

# 3. Save to database
user_cf.save_to_db()

Matrix shape: (304929, 29730)
Sparsity: 0.9999
Processed 1000/304929 users
Processed 2000/304929 users
Processed 5000/304929 users
Processed 6000/304929 users
Processed 7000/304929 users
Processed 10000/304929 users
Processed 11000/304929 users
Processed 12000/304929 users
Processed 14000/304929 users
Processed 19000/304929 users
Processed 21000/304929 users
Processed 23000/304929 users
Processed 24000/304929 users
Processed 25000/304929 users
Processed 26000/304929 users
Processed 27000/304929 users
Processed 31000/304929 users
Processed 33000/304929 users
Processed 35000/304929 users
Processed 36000/304929 users
Processed 38000/304929 users
Processed 40000/304929 users
Processed 48000/304929 users
Processed 49000/304929 users
Processed 54000/304929 users
Processed 55000/304929 users
Processed 58000/304929 users
Processed 59000/304929 users
Processed 60000/304929 users
Processed 76000/304929 users
Processed 81000/304929 users
Processed 85000/304929 users
Processed 92000/304929 users
P

In [57]:
# 4. Recall (example)
# Example: user_id=354804904215332, purchased items [7309427166711388171, 5102145079475080421, 8277585465807360411]
user_id = 354804904215332
user_purchased = [7309427166711388171, 5102145079475080421, 8277585465807360411]
recommendations = user_cf.recall(user_id, user_purchased, top_k=200)
print(f"Recalled {len(recommendations)} candidates from UserCF")
print(f"Top 10 candidates: {recommendations[:10]}")

Recalled 87 candidates from UserCF
Top 10 candidates: [1297447046109733993, 7860705371944056504, 1756872849309157178, 5607669002671406521, 5271488841287897246, 1635127223391354351, 901948250157397415, 882983173168811119, 8105826504302370754, 1604895678997680431]


### 3) Popularity-based Recall

- #### Global Popularity
  Global popularity surfaces trending items by ranking products based on recent engagement metrics (purchases, GMV, or conversion rates) across the entire platform, providing baseline recommendations that effectively capture mainstream demand and serve as highly effective cold-start solutions.

  aggregate platform-wide engagement (purchases/clicks/GMV) → rank items by popularity metrics → apply time decay (recent trends weighted higher) → retrieve top-N trending items → baseline recommendations.

In [18]:
from typing import List, Dict
import psycopg2

class GlobalPopular:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize global popularity recall.
        Args:
            db_config: Database connection configuration
        """

        self.db_config = db_config
        self.popular_items = []

    def load_from_db(self, top_k: int = 1000):
        """
        Load popular items from database.
        Args:
            top_k: Number of popular items to load
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ProductID", "PopularityScore"
                    FROM global_popular
                    ORDER BY "Rank" ASC
                    LIMIT %s
                    """, (top_k,))

                    self.popular_items = [(product_id, score) for product_id, score in cursor.fetchall()]
                    print(f"Loaded {len(self.popular_items)} popular items!")
        except Exception as e:
            print(f"Error loading popular items: {e}")

    def recall(self, top_k: int = 200) -> List[int]:
        """
        Recall top-K popular items.
        Args:
            top_k: Number of items to recall
        Returns:
            List of popular item IDs
        """

        return [item_id for item_id, score in self.popular_items[:top_k]]

In [58]:
# GlobalPopular Example
# 1. Initialize
db_config = {
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'dbname': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'sslmode': 'require'
}

global_popular = GlobalPopular(db_config)

# 2. Load from database
global_popular.load_from_db(top_k=1000)

# 3. Recall (example)
# Example: recall top 200 global popular items
recommendations = global_popular.recall(top_k=200)
print(f"Recalled {len(recommendations)} candidates from GlobalPopular")
print(f"Top 10 candidates: {recommendations[:10]}")

Loaded 1000 popular items!
Recalled 200 candidates from GlobalPopular
Top 10 candidates: [7409975748690869382, 7762597382304525219, 1421634154572742367, 1053601088228117848, 5626151450577313519, 1108527350021568362, 8761826855035940162, 8360937214612273557, 8193836411099087368, 6804252697652906339]


- #### Interest-based Category Recall
  Interest-based recall extracts user preference profiles from historical purchases—aggregating category distributions, brand affinities, and price sensitivities—then retrieves top-performing items within each identified interest category, personalizing recommendations while maintaining diversity across user preference dimensions.

  user purchase history → extract category/brand/price preferences → weight by recency & frequency → identify top interest dimensions → retrieve high-quality items per category → diversified personalized candidates.

In [20]:
from typing import List, Dict
import psycopg2

class CategoryPopular:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize category popularity recall.
        Args:
            db_config: Database connection configuration
        """

        self.db_config = db_config
        self.category_items = {}

    def load_from_db(self, top_k_per_category: int = 100):
        """
        Load popular items per category from database.
        Args:
            top_k_per_category: Number of popular items to load per category
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "Category", "ProductID", "PopularityScore"
                    FROM category_popular
                    WHERE "Rank" <= %s
                    ORDER BY "Category", "Rank" ASC
                    """, (top_k_per_category,))

                    for category, product_id, score in cursor.fetchall():
                        if category not in self.category_items:
                            self.category_items[category] = []
                        self.category_items[category].append((product_id, score))

                    print(f"Loaded popular items for {len(self.category_items)} categories!")
        except Exception as e:
            print(f"Error loading category popular items: {e}")

    def recall(self, user_categories: List[str], top_k: int = 200) -> List[int]:
        """
        Recall popular items from user's preferred categories.
        Args:
            user_categories: List of user's preferred categories (e.g., top 3)
            top_k: Number of items to recall
        Returns:
            List of popular item IDs across user's categories
        """

        candidates = []
        items_per_category = top_k // len(user_categories) if user_categories else 0

        for category in user_categories:
            if category not in self.category_items:
                continue

            category_items = self.category_items[category][:items_per_category]
            candidates.extend([item_id for item_id, score in category_items])

        return candidates[:top_k]

In [59]:
# CategoryPopular Example
# 1. Initialize
db_config = {
    'host': os.getenv('DB_HOST'),
    'port': os.getenv('DB_PORT'),
    'dbname': os.getenv('DB_NAME'),
    'user': os.getenv('DB_USER'),
    'password': os.getenv('DB_PASSWORD'),
    'sslmode': 'require'
}

category_popular = CategoryPopular(db_config)

# 2. Load from database
category_popular.load_from_db(top_k_per_category=100)

# 3. Recall (example)
# Example: user's top categories are ['Softball', 'Basketball', 'Football']
user_categories = ['Softball', 'Basketball', 'Football']
recommendations = category_popular.recall(user_categories, top_k=200)
print(f"Recalled {len(recommendations)} candidates from CategoryPopular")
print(f"Top 10 candidates: {recommendations[:10]}")

Loaded popular items for 16 categories!
Recalled 198 candidates from CategoryPopular
Top 10 candidates: [7144655757625465514, 7547771225294777844, 1892025359185548203, 4421504418588540577, 3499264361068470157, 6860993275576814774, 4014156734236123534, 5699178692738563273, 663096478818697557, 8486138029609193439]


## Stage 2: Coarse Ranking

A lightweight model quickly scores all recalled candidates using core features and compressed representations, filtering the pool from thousands to hundreds while maintaining low latency, acting as an efficient buffer between broad retrieval and expensive precise ranking.

In [25]:
import torch
import torch.nn as nn
from typing import List, Dict, Tuple
import numpy as np

class CoarseRanking:

    def __init__(self, feature_dim: int, hidden_dim: int = 64):
        """
        Initialize coarse ranking model.
        Args:
            feature_dim: Input feature dimension
            hidden_dim: Hidden layer dimension
        """

        self.model = nn.Sequential(
            nn.Linear(feature_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)

    def train_step(self, features: torch.Tensor, labels: torch.Tensor) -> float:
        """
        Single training step.
        Args:
            features: Input features [batch_size, feature_dim]
            labels: Binary labels [batch_size]
        Returns:
            Loss value
        """

        self.model.train()
        self.optimizer.zero_grad()

        predictions = self.model(features).squeeze()
        loss = nn.BCELoss()(predictions, labels.float())

        loss.backward()
        self.optimizer.step()

        return loss.item()

    def predict(self, features: torch.Tensor) -> np.ndarray:
        """
        Predict scores for candidates.
        Args:
            features: Input features [num_candidates, feature_dim]
        Returns:
            Predicted scores [num_candidates]
        """

        self.model.eval()
        with torch.no_grad():
            scores = self.model(features).squeeze()
        self.model.train()
        return scores.cpu().numpy()

    def rank(self, candidate_ids: List[int], features: torch.Tensor, top_k: int = 500) -> List[int]:
        """
        Rank candidates and return top-K.
        Args:
            candidate_ids: List of candidate item IDs
            features: Features for all candidates [num_candidates, feature_dim]
            top_k: Number of items to keep
        Returns:
            Top-K candidate IDs after coarse ranking
        """
        scores = self.predict(features)

        # Sort by score descending
        ranked_indices = np.argsort(scores)[::-1][:top_k]

        return [candidate_ids[i] for i in ranked_indices]

In [26]:
from torch.utils.data import TensorDataset, DataLoader

# 0. Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 1. Load training data from database
try:
    with psycopg2.connect(
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        dbname=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        sslmode='require'
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute("""
            SELECT "ClientID", "ProductID"
            FROM user_item_interactions
            WHERE "PurchaseCount" > 0
            """)
            positive_samples = cursor.fetchall()

            cursor.execute("""
            SELECT "ClientID", "TotalPurchases", "TotalSpendEuro", "AvgOrderValue",
                   "DaysSinceLastPurchase", "PurchaseFrequency", "UniqueProductsBought",
                   "TotalQuantity", "Age"
            FROM clients
            """)
            user_features_data = cursor.fetchall()

            cursor.execute("""
            SELECT "ProductID", "TotalSales", "TotalQuantitySold", "Sales7d",
                   "Sales30d", "AvgPrice", "TotalRevenue", "UniqueBuyers",
                   "AvgQuantityPerOrder", "TotalStockQuantity", "StockCountries"
            FROM products
            """)
            item_features_data = cursor.fetchall()
            print("Training data loaded successfully!")
except Exception as e:
    print(f"Error loading training data: {e}")
    raise

# 2. Prepare features
user_feature_dict = {uid: [0 if x is None else float(x) for x in features]
                     for uid, *features in user_features_data}
item_feature_dict = {iid: [0 if x is None else float(x) for x in features]
                     for iid, *features in item_features_data}

user_feature_dim = len(user_features_data[0]) - 1
item_feature_dim = len(item_features_data[0]) - 1

print(f"Total positive samples: {len(positive_samples)}")

# 3. Generate negative samples
print("Generating negative samples...")
all_item_ids = np.array(list(item_feature_dict.keys()))
positive_set = set(positive_samples)
training_samples = []

# Pre-generate 10 candidate negative samples for each positive sample
neg_candidates = np.random.choice(all_item_ids, size=(len(positive_samples), 10), replace=True)

for idx, (user_id, item_id) in enumerate(positive_samples):
    if user_id not in user_feature_dict or item_id not in item_feature_dict:
        continue

    user_feat = user_feature_dict[user_id]
    item_feat = item_feature_dict[item_id]
    combined_feat = user_feat + item_feat
    training_samples.append((combined_feat, 1))

    # Find first candidate not in positive_set
    for neg_item in neg_candidates[idx]:
        if (user_id, neg_item) not in positive_set and neg_item in item_feature_dict:
            neg_item_feat = item_feature_dict[neg_item]
            combined_feat = user_feat + neg_item_feat
            training_samples.append((combined_feat, 0))
            break

print(f"Total training samples: {len(training_samples)}")

# 4. Prepare tensors
features_tensor_all = torch.FloatTensor([feat for feat, _ in training_samples])
labels_tensor_all = torch.FloatTensor([label for _, label in training_samples])

dataset = TensorDataset(features_tensor_all, labels_tensor_all)
loader = DataLoader(
    dataset,
    batch_size=256,
    shuffle=True
)

# 5. Initialize model
feature_dim = user_feature_dim + item_feature_dim
model = CoarseRanking(feature_dim, hidden_dim=64)
model.model = model.model.to(device)

# 6. Training loop
epochs = 50
patience = 5
best_loss = float('inf')
patience_counter = 0

for epoch in range(epochs):
    epoch_loss = 0
    num_batches = 0

    for batch_idx, (features_tensor, labels_tensor) in enumerate(loader):
        features_tensor = features_tensor.to(device)
        labels_tensor = labels_tensor.to(device)

        loss = model.train_step(features_tensor, labels_tensor)
        epoch_loss += loss
        num_batches += 1

        if batch_idx % 100 == 0:
            print(f"Epoch {epoch + 1}/{epochs}, Batch {batch_idx}/{len(loader)}")

    avg_loss = epoch_loss / num_batches
    print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")

    # Early stopping
    if avg_loss < best_loss:
        best_loss = avg_loss
        patience_counter = 0
        # Save best model
        torch.save({
            'model': model.model.state_dict(),
            'feature_dim': feature_dim
        }, 'coarse_ranking_model_best.pth')
        print(f"Best model saved with loss: {best_loss:.4f}")
    else:
        patience_counter += 1
        print(f"No improvement for {patience_counter} epoch(s)")

        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs")
            break

# 7. Load best model
checkpoint = torch.load('coarse_ranking_model_best.pth')
model.model.load_state_dict(checkpoint['model'])
print("Best model loaded!")

# 8. Save final model
torch.save({
    'model': model.model.state_dict(),
    'feature_dim': feature_dim
}, 'coarse_ranking_model.pth')
print("Model saved successfully!")

Using device: cuda
Training data loaded successfully!
Total positive samples: 976274
Generating negative samples...
Total training samples: 1952548
Epoch 1/50, Batch 0/7628
Epoch 1/50, Batch 100/7628
Epoch 1/50, Batch 200/7628
Epoch 1/50, Batch 300/7628
Epoch 1/50, Batch 400/7628
Epoch 1/50, Batch 500/7628
Epoch 1/50, Batch 600/7628
Epoch 1/50, Batch 700/7628
Epoch 1/50, Batch 800/7628
Epoch 1/50, Batch 900/7628
Epoch 1/50, Batch 1000/7628
Epoch 1/50, Batch 1100/7628
Epoch 1/50, Batch 1200/7628
Epoch 1/50, Batch 1300/7628
Epoch 1/50, Batch 1400/7628
Epoch 1/50, Batch 1500/7628
Epoch 1/50, Batch 1600/7628
Epoch 1/50, Batch 1700/7628
Epoch 1/50, Batch 1800/7628
Epoch 1/50, Batch 1900/7628
Epoch 1/50, Batch 2000/7628
Epoch 1/50, Batch 2100/7628
Epoch 1/50, Batch 2200/7628
Epoch 1/50, Batch 2300/7628
Epoch 1/50, Batch 2400/7628
Epoch 1/50, Batch 2500/7628
Epoch 1/50, Batch 2600/7628
Epoch 1/50, Batch 2700/7628
Epoch 1/50, Batch 2800/7628
Epoch 1/50, Batch 2900/7628
Epoch 1/50, Batch 3000/7

## Stage 3: Fine Ranking

A sophisticated multi-task model (e.g., MMOE) precisely predicts multiple objectives—click probability, purchase likelihood, and expected GMV—using rich features and deep neural architectures, generating granular relevance scores for each candidate item.

In [38]:
import torch
import torch.nn as nn
from typing import List, Tuple
import numpy as np

class Expert(nn.Module):

    def __init__(self, input_dim: int, hidden_dim: int = 128):
        """
        Initialize expert network.

        Args:
            input_dim: Input feature dimension
            hidden_dim: Hidden layer dimension
        """

        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


class Gate(nn.Module):

    def __init__(self, input_dim: int, num_experts: int):
        """
        Initialize gate network.
        Args:
            input_dim: Input feature dimension
            num_experts: Number of experts
        """

        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, num_experts),
            nn.Softmax(dim=1)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


class Tower(nn.Module):

    def __init__(self, input_dim: int, task_type: str = 'binary'):
        """
        Initialize tower network.
        Args:
            input_dim: Input dimension from experts
            task_type: Task type ('binary' or 'regression')
        """

        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid() if task_type == 'binary' else nn.Identity()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

In [39]:
class MMOE(nn.Module):

    def __init__(self, input_dim: int, num_experts: int = 4, hidden_dim: int = 128):
        """
        Initialize MMOE model.
        Args:
            input_dim: Input feature dimension
            num_experts: Number of expert networks
            hidden_dim: Hidden dimension for experts
        """

        super().__init__()

        self.experts = nn.ModuleList([Expert(input_dim, hidden_dim) for _ in range(num_experts)])
        self.gate_purchase = Gate(input_dim, num_experts)
        self.gate_gmv = Gate(input_dim, num_experts)
        self.tower_purchase = Tower(hidden_dim, 'binary')
        self.tower_gmv = Tower(hidden_dim, 'regression')

    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass through MMOE.
        Args:
            x: Input features [batch_size, input_dim]
        Returns:
            Tuple of (purchase_prob, gmv_pred) [batch_size, 1] each
        """

        expert_outputs = torch.stack([expert(x) for expert in self.experts], dim=1)

        gate_weights_purchase = self.gate_purchase(x).unsqueeze(2)
        purchase_input = (expert_outputs * gate_weights_purchase).sum(dim=1)
        purchase_prob = self.tower_purchase(purchase_input)

        gate_weights_gmv = self.gate_gmv(x).unsqueeze(2)
        gmv_input = (expert_outputs * gate_weights_gmv).sum(dim=1)
        gmv_pred = self.tower_gmv(gmv_input)

        return purchase_prob, gmv_pred

    def predict(self, features: torch.Tensor) -> Tuple[np.ndarray, np.ndarray]:
        """
        Predict scores for candidates.
        Args:
            features: Input features [num_candidates, input_dim]
        Returns:
            Tuple of (purchase_probs, gmv_preds) [num_candidates] each
        """

        self.eval()
        with torch.no_grad():
            purchase_prob, gmv_pred = self.forward(features)
        return purchase_prob.squeeze().cpu().numpy(), gmv_pred.squeeze().cpu().numpy()

    def rank(self, candidate_ids: List[int], features: torch.Tensor,
             purchase_weight: float = 0.5, gmv_weight: float = 0.5,
             top_k: int = 20) -> List[int]:
        """
        Rank candidates and return top-K.
        Args:
            candidate_ids: List of candidate item IDs
            features: Features for all candidates [num_candidates, input_dim]
            purchase_weight: Weight for purchase probability
            gmv_weight: Weight for GMV prediction
            top_k: Number of items to return
        Returns:
            Top-K candidate IDs after ranking
        """

        purchase_probs, gmv_preds = self.predict(features)

        gmv_normalized = (gmv_preds - gmv_preds.min()) / (gmv_preds.max() - gmv_preds.min() + 1e-8)
        scores = purchase_weight * purchase_probs + gmv_weight * gmv_normalized
        ranked_indices = np.argsort(scores)[::-1][:top_k]

        return [candidate_ids[i] for i in ranked_indices]


def train_mmoe(model: MMOE, train_loader, val_loader, num_epochs: int = 10,
               purchase_weight: float = 0.5, gmv_weight: float = 0.5,
               device: str = 'cuda'):
    """
    Train MMOE model.
    Args:
        model: MMOE model instance
        train_loader: Training data loader
        val_loader: Validation data loader
        num_epochs: Number of training epochs
        purchase_weight: Weight for purchase task loss
        gmv_weight: Weight for GMV task loss
        device: Device to train on
    """

    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    bce_loss = nn.BCELoss()
    mse_loss = nn.MSELoss()

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        train_purchase_loss = 0.0
        train_gmv_loss = 0.0

        for features, purchase_labels, gmv_labels in train_loader:
            features = features.to(device)
            purchase_labels = purchase_labels.to(device)
            gmv_labels = gmv_labels.to(device)

            purchase_prob, gmv_pred = model(features)

            loss_purchase = bce_loss(purchase_prob.squeeze(), purchase_labels.float())
            loss_gmv = mse_loss(gmv_pred.squeeze(), gmv_labels.float())
            loss = purchase_weight * loss_purchase + gmv_weight * loss_gmv

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            train_purchase_loss += loss_purchase.item()
            train_gmv_loss += loss_gmv.item()

        model.eval()
        val_loss = 0.0
        val_purchase_loss = 0.0
        val_gmv_loss = 0.0

        with torch.no_grad():
            for features, purchase_labels, gmv_labels in val_loader:
                features = features.to(device)
                purchase_labels = purchase_labels.to(device)
                gmv_labels = gmv_labels.to(device)

                purchase_prob, gmv_pred = model(features)

                loss_purchase = bce_loss(purchase_prob.squeeze(), purchase_labels.float())
                loss_gmv = mse_loss(gmv_pred.squeeze(), gmv_labels.float())
                loss = purchase_weight * loss_purchase + gmv_weight * loss_gmv

                val_loss += loss.item()
                val_purchase_loss += loss_purchase.item()
                val_gmv_loss += loss_gmv.item()

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train - Total: {train_loss/len(train_loader):.4f}, Purchase: {train_purchase_loss/len(train_loader):.4f}, GMV: {train_gmv_loss/len(train_loader):.4f}")
        print(f"Val   - Total: {val_loss/len(val_loader):.4f}, Purchase: {val_purchase_loss/len(val_loader):.4f}, GMV: {val_gmv_loss/len(val_loader):.4f}")
        print("-" * 80)

In [41]:
from torch.utils.data import TensorDataset, DataLoader
import torch
import torch.nn as nn
import numpy as np
import psycopg2
import os

# 0. Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# 1. Load training data from database
try:
    with psycopg2.connect(
        host=os.getenv('DB_HOST'),
        port=os.getenv('DB_PORT'),
        dbname=os.getenv('DB_NAME'),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        sslmode='require'
    ) as conn:
        with conn.cursor() as cursor:
            cursor.execute("""
            SELECT "ClientID", "ProductID", "PurchaseCount", "TotalSpent"
            FROM user_item_interactions
            """)
            interaction_data = cursor.fetchall()

            cursor.execute("""
            SELECT "ClientID", "TotalPurchases", "TotalSpendEuro", "AvgOrderValue",
                   "DaysSinceLastPurchase", "PurchaseFrequency", "UniqueProductsBought",
                   "TotalQuantity", "Age"
            FROM clients
            """)
            user_features_data = cursor.fetchall()

            cursor.execute("""
            SELECT "ProductID", "TotalSales", "TotalQuantitySold", "Sales7d",
                   "Sales30d", "AvgPrice", "TotalRevenue", "UniqueBuyers",
                   "AvgQuantityPerOrder", "TotalStockQuantity", "StockCountries"
            FROM products
            """)
            item_features_data = cursor.fetchall()

            print("Training data loaded successfully!")
except Exception as e:
    print(f"Error loading training data: {e}")
    raise

# 2. Prepare features
user_feature_dict = {uid: [0 if x is None else float(x) for x in features]
                     for uid, *features in user_features_data}
item_feature_dict = {iid: [0 if x is None else float(x) for x in features]
                     for iid, *features in item_features_data}

user_feature_dim = len(user_features_data[0]) - 1
item_feature_dim = len(item_features_data[0]) - 1
feature_dim = user_feature_dim + item_feature_dim

print(f"User feature dimension: {user_feature_dim}")
print(f"Item feature dimension: {item_feature_dim}")
print(f"Total feature dimension: {feature_dim}")

# 3. Prepare training samples
print("Preparing training samples...")
training_samples = []

for user_id, item_id, purchase_count, total_spent in interaction_data:
    if user_id not in user_feature_dict or item_id not in item_feature_dict:
        continue

    user_feat = user_feature_dict[user_id]
    item_feat = item_feature_dict[item_id]
    combined_feat = user_feat + item_feat

    purchase_label = 1 if purchase_count > 0 else 0
    gmv_value = 0 if total_spent is None else float(total_spent)

    training_samples.append((combined_feat, purchase_label, gmv_value))

print(f"Total training samples: {len(training_samples)}")

# 4. Split train and validation sets
np.random.shuffle(training_samples)
split_idx = int(0.8 * len(training_samples))
train_samples = training_samples[:split_idx]
val_samples = training_samples[split_idx:]

print(f"Train samples: {len(train_samples)}")
print(f"Validation samples: {len(val_samples)}")

# 5. Prepare tensors
features_train = torch.FloatTensor([feat for feat, _, _ in train_samples])
purchase_labels_train = torch.FloatTensor([label for _, label, _ in train_samples])
gmv_labels_train = torch.FloatTensor([gmv for _, _, gmv in train_samples])

train_dataset = TensorDataset(features_train, purchase_labels_train, gmv_labels_train)
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)

features_val = torch.FloatTensor([feat for feat, _, _ in val_samples])
purchase_labels_val = torch.FloatTensor([label for _, label, _ in val_samples])
gmv_labels_val = torch.FloatTensor([gmv for _, _, gmv in val_samples])

val_dataset = TensorDataset(features_val, purchase_labels_val, gmv_labels_val)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False)

# 6. Initialize model
model = MMOE(input_dim=feature_dim, num_experts=4, hidden_dim=128)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
bce_loss = nn.BCELoss()
mse_loss = nn.MSELoss()

# 7. Training loop
epochs = 50
patience = 5
best_loss = float('inf')
patience_counter = 0
purchase_weight = 0.5
gmv_weight = 0.5

for epoch in range(epochs):
    model.train()
    train_loss = 0.0
    train_purchase_loss = 0.0
    train_gmv_loss = 0.0

    for features, purchase_labels, gmv_labels in train_loader:
        features = features.to(device)
        purchase_labels = purchase_labels.to(device)
        gmv_labels = gmv_labels.to(device)

        purchase_prob, gmv_pred = model(features)

        loss_purchase = bce_loss(purchase_prob.squeeze(), purchase_labels.float())
        loss_gmv = mse_loss(gmv_pred.squeeze(), gmv_labels.float())
        loss = purchase_weight * loss_purchase + gmv_weight * loss_gmv

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_purchase_loss += loss_purchase.item()
        train_gmv_loss += loss_gmv.item()

    model.eval()
    val_loss = 0.0
    val_purchase_loss = 0.0
    val_gmv_loss = 0.0

    with torch.no_grad():
        for features, purchase_labels, gmv_labels in val_loader:
            features = features.to(device)
            purchase_labels = purchase_labels.to(device)
            gmv_labels = gmv_labels.to(device)

            purchase_prob, gmv_pred = model(features)

            loss_purchase = bce_loss(purchase_prob.squeeze(), purchase_labels.float())
            loss_gmv = mse_loss(gmv_pred.squeeze(), gmv_labels.float())
            loss = purchase_weight * loss_purchase + gmv_weight * loss_gmv

            val_loss += loss.item()
            val_purchase_loss += loss_purchase.item()
            val_gmv_loss += loss_gmv.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch {epoch+1}/{epochs}")
    print(f"Train - Total: {train_loss/len(train_loader):.4f}, Purchase: {train_purchase_loss/len(train_loader):.4f}, GMV: {train_gmv_loss/len(train_loader):.4f}")
    print(f"Val   - Total: {avg_val_loss:.4f}, Purchase: {val_purchase_loss/len(val_loader):.4f}, GMV: {val_gmv_loss/len(val_loader):.4f}")

    # Early stopping
    if avg_val_loss < best_loss:
        best_loss = avg_val_loss
        patience_counter = 0
        torch.save({
            'model_state_dict': model.state_dict(),
            'feature_dim': feature_dim,
            'num_experts': 4,
            'hidden_dim': 128
        }, 'mmoe_model_best.pth')
        print(f"Best model saved with loss: {best_loss:.4f}")
    else:
        patience_counter += 1
        print(f"No improvement for {patience_counter} epoch(s)")

        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch + 1} epochs")
            break

    print("-" * 80)

# 8. Load best model
checkpoint = torch.load('mmoe_model_best.pth')
model.load_state_dict(checkpoint['model_state_dict'])
print("Best model loaded!")

# 9. Save final model
torch.save({
    'model_state_dict': model.state_dict(),
    'feature_dim': feature_dim,
    'num_experts': 4,
    'hidden_dim': 128
}, 'mmoe_model.pth')
print("Model saved successfully!")

Using device: cuda
Training data loaded successfully!
User feature dimension: 8
Item feature dimension: 10
Total feature dimension: 18
Preparing training samples...
Total training samples: 976274
Train samples: 781019
Validation samples: 195255
Epoch 1/50
Train - Total: 65809.1628, Purchase: 0.0000, GMV: 131618.3256
Val   - Total: 10299.8049, Purchase: 0.0000, GMV: 20599.6099
Best model saved with loss: 10299.8049
--------------------------------------------------------------------------------
Epoch 2/50
Train - Total: 5840.9278, Purchase: 0.0000, GMV: 11681.8557
Val   - Total: 3910.7425, Purchase: 0.0000, GMV: 7821.4850
Best model saved with loss: 3910.7425
--------------------------------------------------------------------------------
Epoch 3/50
Train - Total: 6602.3654, Purchase: 0.0000, GMV: 13204.7308
Val   - Total: 4636.6761, Purchase: 0.0000, GMV: 9273.3522
No improvement for 1 epoch(s)
--------------------------------------------------------------------------------
Epoch 4/50


## Stage 4: Re-ranking

Post-processing refines the ranked list by applying diversity constraints, business rules, and fairness policies through algorithms like MMR or DPP, transforming a purely relevance-driven ordering into a well-balanced and engaging presentation that maximizes both overall user experience and strategic business objectives.

In [60]:
from typing import List, Dict
import psycopg2

class Reranking:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize reranking module.

        Args:
            db_config: Database connection configuration
        """

        self.db_config = db_config
        self.product_stocks = {}
        self.country_stocks = {}

    def load_stock_data(self):
        """
        Load stock information from database.
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    # Load total stock per product
                    cursor.execute("""
                    SELECT "ProductID", "TotalStockQuantity"
                    FROM products
                    WHERE "TotalStockQuantity" > 0
                    """)
                    self.product_stocks = {product_id: stock for product_id, stock in cursor.fetchall()}

                    # Load country-specific stock
                    cursor.execute("""
                    SELECT "ProductID", "StoreCountry", "Quantity"
                    FROM stocks
                    WHERE "Quantity" > 0
                    """)
                    for product_id, country, quantity in cursor.fetchall():
                        if product_id not in self.country_stocks:
                            self.country_stocks[product_id] = {}
                        self.country_stocks[product_id][country] = quantity

                    print("Stock data loaded successfully!")
        except Exception as e:
            print(f"Error loading stock data: {e}")

    def rerank(self, candidate_ids: List[int], scores: List[float],
               user_country: str, stock_weight: float = 0.3,
               country_weight: float = 0.4) -> List[int]:
        """
        Rerank candidates based on stock availability and country preference.
        Args:
            candidate_ids: List of candidate item IDs
            scores: Original ranking scores for each candidate
            user_country: User's country code
            stock_weight: Weight for total stock quantity
            country_weight: Weight for country-specific stock
        Returns:
            Reranked list of item IDs
        """

        reranked_items = []

        for idx, item_id in enumerate(candidate_ids):
            original_score = scores[idx]

            # Stock boost
            stock_boost = 0.0
            if item_id in self.product_stocks:
                total_stock = self.product_stocks[item_id]
                stock_boost = min(total_stock / 100.0, 1.0) * stock_weight

            # Country stock boost
            country_boost = 0.0
            if item_id in self.country_stocks and user_country in self.country_stocks[item_id]:
                country_stock = self.country_stocks[item_id][user_country]
                country_boost = min(country_stock / 50.0, 1.0) * country_weight

            # Combined score
            final_score = original_score + stock_boost + country_boost

            reranked_items.append((item_id, final_score))

        # Sort by final score descending
        reranked_items.sort(key=lambda x: x[1], reverse=True)

        return [item_id for item_id, score in reranked_items]

## Step 5: End-End Pipeline

In [65]:
class RecommendationPipeline:

    def __init__(self, db_config: Dict[str, str]):
        """
        Initialize recommendation pipeline.
        Args:
            db_config: Database connection configuration dictionary
        """

        self.db_config = db_config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self._load_models()
        self.item_embeddings, self.item_id_to_idx = self._load_item_embeddings()

    def _load_models(self):
        """Load all trained models."""

        print("Loading models...")

        # Load two-tower model
        two_tower_checkpoint = torch.load('two_tower_model.pth', map_location=self.device)
        self.two_tower = TwoTowerRecall(
            user_feature_dim=8,
            item_feature_dim=10,
            embedding_dim=64
        )
        self.two_tower.user_tower.load_state_dict(two_tower_checkpoint['user_tower'])
        self.two_tower.item_tower.load_state_dict(two_tower_checkpoint['item_tower'])
        self.two_tower.user_tower = self.two_tower.user_tower.to(self.device)
        self.two_tower.item_tower = self.two_tower.item_tower.to(self.device)

        # Load ItemCF model
        self.item_cf = ItemCF(self.db_config)
        self.item_cf.load_from_db()

        # Load UserCF model
        self.user_cf = UserCF(self.db_config)
        self.user_cf.load_from_db()

        # Load global popular model
        self.global_popular = GlobalPopular(self.db_config)
        self.global_popular.load_from_db()

        # Load category popular model
        self.category_popular = CategoryPopular(self.db_config)
        self.category_popular.load_from_db()

        # Load coarse ranking model
        coarse_checkpoint = torch.load('coarse_ranking_model.pth', map_location=self.device)
        self.coarse_ranker = CoarseRanking(
            feature_dim=coarse_checkpoint['feature_dim'],
            hidden_dim=64
        )
        self.coarse_ranker.model.load_state_dict(coarse_checkpoint['model'])
        self.coarse_ranker.model = self.coarse_ranker.model.to(self.device)

        # Load MMOE model
        mmoe_checkpoint = torch.load('mmoe_model.pth', map_location=self.device)
        self.fine_ranker = MMOE(
            input_dim=mmoe_checkpoint['feature_dim'],
            num_experts=4,
            hidden_dim=128
        )
        self.fine_ranker.load_state_dict(mmoe_checkpoint['model_state_dict'])
        self.fine_ranker = self.fine_ranker.to(self.device)

        # Load reranking model
        self.reranker = Reranking(self.db_config)
        self.reranker.load_stock_data()

        print("All models loaded successfully!")

    def _load_item_embeddings(self):
        """Load item embeddings from database."""

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute('SELECT "ProductID", "Embedding" FROM item_embeddings')
                    item_data = cursor.fetchall()

                    item_id_to_idx = {pid: i for i, (pid, _) in enumerate(item_data)}
                    item_embeddings = np.array([emb for _, emb in item_data])

                    print("Item embeddings loaded successfully!")
                    return item_embeddings, item_id_to_idx
        except Exception as e:
            print(f"Error loading item embeddings: {e}")
            return np.array([]), {}

    def _get_user_features(self, user_id: int) -> List[float]:
        """
        Get user features from database.
        Args:
            user_id: User ID
        Returns:
            List of user feature values
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "TotalPurchases", "TotalSpendEuro", "AvgOrderValue",
                           "DaysSinceLastPurchase", "PurchaseFrequency",
                           "UniqueProductsBought", "TotalQuantity", "Age"
                    FROM clients WHERE "ClientID" = %s
                    """, (user_id,))

                    result = cursor.fetchone()
                    return [0 if x is None else float(x) for x in result] if result else [0] * 8
        except Exception as e:
            print(f"Error loading user features: {e}")
            return [0] * 8

    def _get_item_features(self, item_ids: List[int]) -> torch.Tensor:
        """
        Get item features from database.
        Args:
            item_ids: List of item IDs
        Returns:
            Tensor of item features [num_items, item_feature_dim]
        """

        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "TotalSales", "TotalQuantitySold", "Sales7d", "Sales30d",
                           "AvgPrice", "TotalRevenue", "UniqueBuyers",
                           "AvgQuantityPerOrder", "TotalStockQuantity", "StockCountries"
                    FROM products WHERE "ProductID" = ANY(%s)
                    """, (item_ids,))

                    features = [[0 if x is None else float(x) for x in row] for row in cursor.fetchall()]
                    return torch.tensor(features, dtype=torch.float32)
        except Exception as e:
            print(f"Error loading item features: {e}")
            return torch.zeros(len(item_ids), 10)

    def _get_combined_features(self, user_id: int, item_ids: List[int]) -> torch.Tensor:
        """
        Get combined user-item features for ranking.
        Args:
            user_id: User ID
            item_ids: List of item IDs
        Returns:
            Combined feature tensor [num_items, user_feature_dim + item_feature_dim]
        """

        user_features = self._get_user_features(user_id)
        item_features = self._get_item_features(item_ids)

        user_tensor = torch.tensor([user_features] * len(item_ids), dtype=torch.float32)
        combined = torch.cat([user_tensor, item_features], dim=1)

        return combined.to(self.device)

    def recommend(self, user_id: int, top_k: int = 20) -> List[int]:
        """
        Generate recommendations for a user.
        Args:
            user_id: User ID
            top_k: Number of final recommendations
        Returns:
            List of recommended item IDs
        """

        print(f"\n{'='*50}")
        print(f"Generating recommendations for User {user_id}")
        print(f"{'='*50}\n")

        # Load user data from database
        try:
            with psycopg2.connect(**self.db_config) as conn:
                with conn.cursor() as cursor:
                    cursor.execute("""
                    SELECT "ClientCountry", "TopCategory1", "TopCategory2", "TopCategory3"
                    FROM clients WHERE "ClientID" = %s
                    """, (user_id,))
                    result = cursor.fetchone()

                    if not result:
                        print("User not found!")
                        return []

                    user_country = result[0]
                    user_categories = [cat for cat in result[1:] if cat]

                    cursor.execute("""
                    SELECT "ProductID" FROM user_item_interactions WHERE "ClientID" = %s
                    """, (user_id,))
                    user_purchased = [row[0] for row in cursor.fetchall()]
        except Exception as e:
            print(f"Error loading user data: {e}")
            return []

        # Get user features for two-tower
        user_features = self._get_user_features(user_id)

        # Stage 1: Multi-channel recall
        print("Stage 1: Multi-channel Recall")
        print("-" * 50)

        candidates = set()

        # Recall from two-tower model
        user_tensor = torch.FloatTensor([user_features]).to(self.device)
        user_embedding = self.two_tower.encode_users(user_tensor)[0]
        two_tower_idx = self.two_tower.recall(user_embedding, self.item_embeddings, top_k=500)
        two_tower_items = [list(self.item_id_to_idx.keys())[idx] for idx in two_tower_idx]
        candidates.update(two_tower_items)
        print(f"TwoTower recalled: {len(two_tower_items)} candidates")

        # Recall from ItemCF model
        itemcf_items = self.item_cf.recall(user_purchased, top_k=300)
        candidates.update(itemcf_items)
        print(f"ItemCF recalled: {len(itemcf_items)} candidates")

        # Recall from UserCF model
        usercf_items = self.user_cf.recall(user_id, user_purchased, top_k=200)
        candidates.update(usercf_items)
        print(f"UserCF recalled: {len(usercf_items)} candidates")

        # Recall from global popular
        global_items = self.global_popular.recall(top_k=200)
        candidates.update(global_items)
        print(f"GlobalPopular recalled: {len(global_items)} candidates")

        # Recall from category popular
        category_items = self.category_popular.recall(user_categories, top_k=200)
        candidates.update(category_items)
        print(f"CategoryPopular recalled: {len(category_items)} candidates")

        # Remove purchased items
        candidates = list(candidates - set(user_purchased))
        print(f"Total unique candidates: {len(candidates)}")

        # Stage 2: Coarse ranking
        print("\nStage 2: Coarse Ranking")
        print("-" * 50)

        candidate_features = self._get_combined_features(user_id, candidates)
        coarse_candidates = self.coarse_ranker.rank(candidates, candidate_features, top_k=500)
        print(f"Coarse ranking kept: {len(coarse_candidates)} candidates")

        # Stage 3: Fine ranking
        print("\nStage 3: Fine Ranking")
        print("-" * 50)

        fine_features = self._get_combined_features(user_id, coarse_candidates)
        purchase_probs, gmv_preds = self.fine_ranker.predict(fine_features)

        gmv_normalized = (gmv_preds - gmv_preds.min()) / (gmv_preds.max() - gmv_preds.min() + 1e-8)
        scores = 0.5 * purchase_probs + 0.5 * gmv_normalized

        ranked_indices = np.argsort(scores)[::-1][:100]
        fine_candidates = [coarse_candidates[i] for i in ranked_indices]
        fine_scores = [scores[i] for i in ranked_indices]
        print(f"Fine ranking kept: {len(fine_candidates)} candidates")

        # Stage 4: Reranking
        print("\nStage 4: Reranking")
        print("-" * 50)

        final_items = self.reranker.rerank(fine_candidates, fine_scores, user_country)[:top_k]
        print(f"Reranking kept: {len(final_items)} candidates")

        print(f"\n{'='*50}")
        print(f"Final recommendations: {final_items}")
        print(f"{'='*50}\n")

        return final_items

In [66]:
# Example usage
if __name__ == "__main__":
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT'),
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD'),
        'sslmode': 'require'
    }

    # Initialize pipeline (loads all models automatically)
    pipeline = RecommendationPipeline(db_config)

    # Generate recommendations
    user_id = 2820558652430377474
    recommendations = pipeline.recommend(user_id, top_k=20)

Loading models...
Item similarity loaded successfully! Total items: 6873
User similarity loaded successfully! Total users: 43766
Loaded 1000 popular items!
Loaded popular items for 16 categories!
Stock data loaded successfully!
All models loaded successfully!
Item embeddings loaded successfully!

Generating recommendations for User 2820558652430377474

Stage 1: Multi-channel Recall
--------------------------------------------------
TwoTower recalled: 500 candidates
ItemCF recalled: 0 candidates
UserCF recalled: 0 candidates
GlobalPopular recalled: 200 candidates
CategoryPopular recalled: 200 candidates
Total unique candidates: 880

Stage 2: Coarse Ranking
--------------------------------------------------
Coarse ranking kept: 500 candidates

Stage 3: Fine Ranking
--------------------------------------------------
Fine ranking kept: 100 candidates

Stage 4: Reranking
--------------------------------------------------
Reranking kept: 20 candidates

Final recommendations: [257445159775393