In [1]:
import torch
import torch.nn as nn
import numpy as np
import pickle
from typing import Dict, Tuple, Optional

In [2]:
class FeatureCheckInMatrix(nn.Module):
    def __init__(self, 
								level: int,
								X_A: np.ndarray,           # U_u: (n_users, d_Uu)
								X_T: np.ndarray,           # H^l_u: (n_users, d_Hu)
								Y_A: np.ndarray,           # U^l_p: (n_pois, d_Up)
								Y_T: np.ndarray,           # H^l_p: (n_pois, d_Hp)
								A_lp: np.ndarray,          # A^l_p: (n_pois, r_l) - inter-level features
								latent_dim: Optional[int] = None,  # Projection dim if user/poi dims differ
								init_std: float = 0.01):
        super().__init__()
        
        self.level = level
        self.n_users = X_A.shape[0]
        self.n_pois = Y_A.shape[0]
        
        # Register precomputed features as buffers (non-trainable)
        self.register_buffer('X_A', torch.tensor(X_A, dtype=torch.float32))
        self.register_buffer('X_T', torch.tensor(X_T, dtype=torch.float32))
        self.register_buffer('Y_A', torch.tensor(Y_A, dtype=torch.float32))
        self.register_buffer('Y_T', torch.tensor(Y_T, dtype=torch.float32))
        self.register_buffer('A_lp', torch.tensor(A_lp, dtype=torch.float32))
        
        # Dimension of inter-level features (r_l)
        self.r_l = A_lp.shape[1]
        
        # Trainable parameter: A^l (implicit user features to match A^l_p space)
        # Shape: (n_users, r_l) - same second dimension as A^l_p
        self.A_u = nn.Parameter(torch.randn(self.n_users, self.r_l) * init_std)
        
        # Calculate total dimensions
        self.user_total_dim = X_A.shape[1] + X_T.shape[1] + self.r_l
        self.poi_total_dim = Y_A.shape[1] + Y_T.shape[1] + self.r_l
        
        print(f"[Level {level}] Initializing FeatureCheckInMatrix:")
        print(f"  P^l components: U_u({X_A.shape[1]}) || H^l_u({X_T.shape[1]}) || A^l({self.r_l}) = {self.user_total_dim}")
        print(f"  Q^l components: U_p({Y_A.shape[1]}) || H^l_p({Y_T.shape[1]}) || A^l_p({self.r_l}) = {self.poi_total_dim}")
        
        # Handle dimension mismatch via projection layers
        if self.user_total_dim != self.poi_total_dim:
            if latent_dim is None:
                # Auto-select latent dim as max of the two, or you can set manually
                latent_dim = max(self.user_total_dim, self.poi_total_dim)
                print(f"  ⚠️ Dimension mismatch detected. Projecting both to dim {latent_dim}")
            else:
                print(f"  Projecting to latent dim {latent_dim}")
            
            self.W_user = nn.Linear(self.user_total_dim, latent_dim)
            self.W_poi = nn.Linear(self.poi_total_dim, latent_dim)
            self.latent_dim = latent_dim
            self.use_projection = True
        else:
            self.latent_dim = self.user_total_dim
            self.use_projection = False
            print(f"  ✓ Dimensions match: {self.user_total_dim}")
            
        self.dropout = nn.Dropout(0.1)
        
    def get_P_l(self, user_indices: Optional[torch.Tensor] = None) -> torch.Tensor:
        if user_indices is None:
            X_A_batch = self.X_A
            X_T_batch = self.X_T
            A_u_batch = self.A_u
        else:
            X_A_batch = self.X_A[user_indices]
            X_T_batch = self.X_T[user_indices]
            A_u_batch = self.A_u[user_indices]
            
        # Concatenate: [Explicit || Implicit || Trainable]
        P_l = torch.cat([X_A_batch, X_T_batch, A_u_batch], dim=-1)
        return P_l
    
    def get_Q_l(self, poi_indices: Optional[torch.Tensor] = None) -> torch.Tensor:
        if poi_indices is None:
            Y_A_batch = self.Y_A
            Y_T_batch = self.Y_T
            A_lp_batch = self.A_lp
        else:
            Y_A_batch = self.Y_A[poi_indices]
            Y_T_batch = self.Y_T[poi_indices]
            A_lp_batch = self.A_lp[poi_indices]
            
        # Concatenate: [Explicit || Implicit || Inter-level]
        Q_l = torch.cat([Y_A_batch, Y_T_batch, A_lp_batch], dim=-1)
        return Q_l
    
    def project_to_latent(self, P_l: torch.Tensor, Q_l: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        """Apply projection layers if dimensions differ"""
        if self.use_projection:
            P_proj = self.W_user(P_l)
            Q_proj = self.W_poi(Q_l)
            return P_proj, Q_proj
        return P_l, Q_l
    
    def forward(self, user_indices: torch.Tensor, poi_indices: torch.Tensor) -> torch.Tensor:
        """
        Compute scores for specific user-POI pairs (batch mode)
        
        Args:
            user_indices: (batch_size,) tensor of user indices
            poi_indices: (batch_size,) tensor of poi indices
            
        Returns:
            scores: (batch_size,) similarity scores
        """
        # Get representations
        P_l = self.get_P_l(user_indices)    # (batch, user_total_dim)
        Q_l = self.get_Q_l(poi_indices)     # (batch, poi_total_dim)
        
        # Project if needed
        P_proj, Q_proj = self.project_to_latent(P_l, Q_l)
        
        # Apply dropout
        P_proj = self.dropout(P_proj)
        Q_proj = self.dropout(Q_proj)
        
        # Compute dot product for each pair: sum over latent dim
        scores = torch.sum(P_proj * Q_proj, dim=1)  # (batch,)
        
        return scores
    
    def compute_matrix(self, batch_size: int = 512) -> torch.Tensor:
        self.eval()
        S_l_chunks = []
        
        with torch.no_grad():
            # Process users in batches to avoid memory issues
            for i in range(0, self.n_users, batch_size):
                end_i = min(i + batch_size, self.n_users)
                
                # Get P^l for this user batch
                user_idx = torch.arange(i, end_i)
                P_l = self.get_P_l(user_idx)  # (batch, user_total_dim)
                
                if self.use_projection:
                    P_proj = self.W_user(P_l)  # (batch, latent_dim)
                else:
                    P_proj = P_l
                
                # Get full Q^l
                Q_l = self.get_Q_l()  # (n_pois, poi_total_dim)
                
                if self.use_projection:
                    Q_proj = self.W_poi(Q_l)  # (n_pois, latent_dim)
                else:
                    Q_proj = Q_l
                
                # Compute batch of S^l: (batch, latent) @ (latent, n_pois) -> (batch, n_pois)
                S_batch = torch.matmul(P_proj, Q_proj.t())
                S_l_chunks.append(S_batch.cpu())
                
        S_l = torch.cat(S_l_chunks, dim=0)
        return S_l
    
    def get_top_k(self, user_idx: int, k: int = 10) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Get top-k POI recommendations for a specific user
        """
        self.eval()
        with torch.no_grad():
            # Compute scores for this user against all POIs
            user_tensor = torch.tensor([user_idx])
            all_pois = torch.arange(self.n_pois)
            
            # Batch compute to avoid memory issues if n_pois is large
            scores_list = []
            batch_size = 1024
            for i in range(0, self.n_pois, batch_size):
                end_i = min(i + batch_size, self.n_pois)
                poi_batch = torch.arange(i, end_i)
                user_batch = user_tensor.repeat(end_i - i)
                
                batch_scores = self.forward(user_batch, poi_batch)
                scores_list.append(batch_scores)
                
            scores = torch.cat(scores_list)
            top_scores, top_indices = torch.topk(scores, k)
            
        return top_scores, top_indices


def load_and_initialize_level(level: int, 
                              user_emb_path: str,
                              poi_emb_path: str, 
                              interlevel_path: str,
                              latent_dim: Optional[int] = None) -> FeatureCheckInMatrix:
    level_key = f'level_{level}'
    
    # Load user features (U_u and H^l_u)
    with open(user_emb_path, 'rb') as f:
        user_data = pickle.load(f)
        X_A = user_data['X_A']  # Explicit
        X_T = user_data['X_T']  # Implicit
        
    # Load POI explicit/implicit features (U^l_p and H^l_p)
    with open(poi_emb_path, 'rb') as f:
        poi_data = pickle.load(f)
        Y_A = poi_data['poi_embeddings'][level_key]['Y_A']
        Y_T = poi_data['poi_embeddings'][level_key]['Y_T']
        
    # Load inter-level features A^l_p
    with open(interlevel_path, 'rb') as f:
        inter_data = pickle.load(f)
        A_lp = inter_data['A_lp'][level_key]
        
    # Initialize model
    model = FeatureCheckInMatrix(
        level=level,
        X_A=X_A,
        X_T=X_T,
        Y_A=Y_A,
        Y_T=Y_T,
        A_lp=A_lp,
        latent_dim=latent_dim
    )
    
    return model

In [6]:
if __name__ == "__main__":
    # File paths
    user_emb_path = "../../Sources/Embeddings v3/user_embeddings.pkl"
    poi_emb_path = "../../Sources/Embeddings v3/poi_embeddings.pkl"
    interlevel_path = "../../Sources/Embeddings v3/poi_interlevel_features.pkl"

In [9]:
def initialize_all_levels(user_emb_path, poi_emb_path, interlevel_path, latent_dim=256):
    """Initialize FeatureCheckInMatrix for all 4 levels"""
    models = {}
    S_matrices = {}
    
    for level in range(4):
        print(f"\n{'='*60}")
        print(f"Initializing Level {level}...")
        
        model = load_and_initialize_level(
            level=level,
            user_emb_path=user_emb_path,
            poi_emb_path=poi_emb_path,
            interlevel_path=interlevel_path,
            latent_dim=latent_dim
        )
        
        # Compute S^l immediately
        print(f"Computing S^{level}...")
        S_l = model.compute_matrix(batch_size=256)
        
        print(f"S^{level} shape: {S_l.shape}")
        print(f"S^{level} stats: μ={S_l.mean():.3f}, σ={S_l.std():.3f}, range=[{S_l.min():.3f}, {S_l.max():.3f}]")
        
        models[f'level_{level}'] = model
        S_matrices[f'level_{level}'] = S_l
        
        # Save individual level
        torch.save(model.state_dict(), f"../../Sources/Weights/feature_model_level{level}_init.pt")
    
    # Save all S matrices together
    with open("../../Sources/Embeddings v3/S_matrices_feature.pkl", 'wb') as f:
        pickle.dump({
            'S_matrices': S_matrices,
            'metadata': {
                'formula': 'S^l = P^l(Q^l)^T',
                'levels': [0, 1, 2, 3],
                'latent_dim': latent_dim,
                'note': 'A^l parameters randomly initialized, not yet trained'
            }
        }, f)
    
    return models, S_matrices

models, S_matrices = initialize_all_levels(
    user_emb_path=user_emb_path,
    poi_emb_path=poi_emb_path,
    interlevel_path=interlevel_path,
    latent_dim=256
)


Initializing Level 0...
[Level 0] Initializing FeatureCheckInMatrix:
  P^l components: U_u(39) || H^l_u(32) || A^l(221) = 292
  Q^l components: U_p(149) || H^l_p(72) || A^l_p(221) = 442
  Projecting to latent dim 256
Computing S^0...
S^0 shape: torch.Size([21, 4696])
S^0 stats: μ=-0.040, σ=0.873, range=[-16.763, 26.042]

Initializing Level 1...
[Level 1] Initializing FeatureCheckInMatrix:
  P^l components: U_u(39) || H^l_u(32) || A^l(171) = 242
  Q^l components: U_p(99) || H^l_p(72) || A^l_p(171) = 342
  Projecting to latent dim 256
Computing S^1...
S^1 shape: torch.Size([21, 1355])
S^1 stats: μ=0.032, σ=1.082, range=[-14.222, 20.153]

Initializing Level 2...
[Level 2] Initializing FeatureCheckInMatrix:
  P^l components: U_u(39) || H^l_u(32) || A^l(125) = 196
  Q^l components: U_p(53) || H^l_p(72) || A^l_p(125) = 250
  Projecting to latent dim 256
Computing S^2...
S^2 shape: torch.Size([21, 44])
S^2 stats: μ=-0.072, σ=1.503, range=[-7.142, 6.339]

Initializing Level 3...
[Level 3] Ini