# LightFM Game Recommendation System

## 1. Load dependencies

In [1]:
# -----------------=[ Load Dependencies ]=----------------

import numpy as np
import pandas as pd
import scann
from lightfm import LightFM
from lightfm.data import Dataset
from tqdm import tqdm
from scipy.sparse import load_npz
import pickle
from sklearn.preprocessing import MultiLabelBinarizer
from lightfm.evaluation import precision_at_k, recall_at_k
from lightfm.cross_validation import random_train_test_split

2025-01-26 19:03:37.380356: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-01-26 19:03:37.451233: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-26 19:03:37.634976: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2025-01-26 19:03:37.635613: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## 2. Load preprocessed data
We have original data:
1. games.csv
2. users.csv
3. recommendations.csv
4. games_metadata.json

...

Then we preprocessed the original data by getting rid of games with less than 15 reviews, in order to decrease the sparisty and maintain only valuable information.

Preprocessed data:
1. rating_matrix_sparse.npz
2. test_matrix.npz
3. train_matrix.npz
4. train_and_test.npz


Now let's jump into the code...

In [2]:
# -----------------=[ Data reading ]=----------------

users = pd.read_csv('./data/users.csv')
games = pd.read_csv('./data/games.csv')
recommendations = pd.read_csv('./data/recommendations.csv')
gamesMetadata = pd.read_json('./data/games_metadata.json', lines=True)

interactions = load_npz('./data/train_and_test.npz').tocsr()

# Test users with 40% of history (This is used for testing)
rest_test = load_npz('./data/rest_test.npz').tocsr()

# Test users with 100% history (Used for getting user indicies)
test_matrix = load_npz('./data/test_matrix.npz').tocsr()

## 3. Mappers
Then we create mappers:   
index -> app_id / user_id  
app_id / user_id -> index

This is mostly used for creating sparse matrices due to the fact that app_id or user_id can be very high and we want to keep everything in order. So we just use their indexes as cooridinates in sparse matrices.

In [3]:
# -------------------=[ Mappers ]=-------------------

userIds = users['user_id'].unique()
gameIds = games['app_id'].unique()

mapUserId = {user_id: idx for idx, user_id in enumerate(userIds)}
mapGameId = {game_id: idx for idx, game_id in enumerate(gameIds)}
mapUserIndex = {idx: user_id for user_id, idx in mapUserId.items()}
mapGameIndex = {idx: game_id for game_id, idx in mapGameId.items()}

mapToTitle = lambda game_id: games[games['app_id'] == game_id]['title'].values[0]

## 4. Model training and fine tuning
With these functions we train our LightFM model. We also can tweak a bunch of parameters in order to squeeze out better metrics. More about them later on.

In [6]:
# -------------------=[ Model training ]=-------------------
def fit(model, name, epochs=100):
  for epoch in range(1, epochs + 1):
    model.fit_partial(interactions, epochs=5, num_threads=20)

    val_recall = recall_at_k(
      model,
      rest_test,
      k=20,
      num_threads=20
    ).mean()

    print(f"Epoch {epoch}: [TEST]Recall@20 = {val_recall:.4f}")

    with open(f'./data/model/lightfm_{name}.pkl', 'wb') as f:
      pickle.dump(model, f)


def loadModel(name) -> LightFM:
  with open(f'./data/model/lightfm_{name}.pkl', 'rb') as f:
    model = pickle.load(f)
    return model

In [7]:
model = LightFM(no_components=64, loss='warp', k=10, learning_rate=0.01, random_state=42)

fit(model, '64ft', epochs=100)

Epoch 1: [TEST]Recall@20 = 0.1061
Epoch 2: [TEST]Recall@20 = 0.1191
Epoch 3: [TEST]Recall@20 = 0.1238
Epoch 4: [TEST]Recall@20 = 0.1269
Epoch 5: [TEST]Recall@20 = 0.1284
Epoch 6: [TEST]Recall@20 = 0.1295
Epoch 7: [TEST]Recall@20 = 0.1323
Epoch 8: [TEST]Recall@20 = 0.1360
Epoch 9: [TEST]Recall@20 = 0.1363
Epoch 10: [TEST]Recall@20 = 0.1378
Epoch 11: [TEST]Recall@20 = 0.1396
Epoch 12: [TEST]Recall@20 = 0.1405
Epoch 13: [TEST]Recall@20 = 0.1416
Epoch 14: [TEST]Recall@20 = 0.1418
Epoch 15: [TEST]Recall@20 = 0.1422
Epoch 16: [TEST]Recall@20 = 0.1436
Epoch 17: [TEST]Recall@20 = 0.1448
Epoch 18: [TEST]Recall@20 = 0.1464
Epoch 19: [TEST]Recall@20 = 0.1448
Epoch 20: [TEST]Recall@20 = 0.1445
Epoch 21: [TEST]Recall@20 = 0.1437
Epoch 22: [TEST]Recall@20 = 0.1435
Epoch 23: [TEST]Recall@20 = 0.1431


KeyboardInterrupt: 

## 6. Model prediction / recommendation

In [8]:
# Calculate popularity as interaction counts (train set only)
train_popularity = np.array(interactions.sum(axis=0)).flatten()

# Apply Laplace smoothing to avoid zero-division errors
train_popularity += 1

# Normalize to [0,1] using log scaling (handles long-tail distribution)
log_popularity = np.log(train_popularity)
popularity_weights = (log_popularity - log_popularity.min()) / (log_popularity.max() - log_popularity.min())

In [10]:
def recommendD(user_id, k, alpha=0.8):
  # Get items already interacted with IN TRAINING DATA
  _, known_items = interactions[user_id].nonzero()
  
  # Get all possible candidate items
  all_items = np.arange(interactions.shape[1])
  candidate_items = np.setdiff1d(all_items, known_items)
  
  # Score only unseen items
  scores = model.predict(
      user_ids=np.full(len(candidate_items), user_id),
      item_ids=candidate_items,
      num_threads=20
  )
  
  # Get popularity scores for candidates
  pop_scores = popularity_weights[candidate_items]
  
  # Blend scores
  combined_scores = alpha * scores + (1 - alpha) * pop_scores
  
  # Get top-k items
  top_k_indices = np.argsort(-scores)[:k]
  return candidate_items[top_k_indices]

## 7. Various metrics testing

In [11]:
import sys
sys.path.append("../")  # Replace with your actual path

from metrics import *

class modelL:
  def __init__(self, model):
    self.model = model
  
  def recommend(self, user_id, k):
    return recommendD(user_id, k, 0.8)

print(test_metrics(modelL(model), 20))

100%|██████████| 1932/1932 [00:21<00:00, 90.77it/s]
100%|██████████| 1932/1932 [00:21<00:00, 90.07it/s]

{'recall': 0.15654989648033124, 'hitrate': 0.5693581780538303, 'precision': 0.057996894409937896, 'ndcg': 0.12026146107433483, 'coverage': 0.025573989621009593, 'mrr': 0.21341642190517496}





In [5]:
from hyperopt import hp
from hyperopt import STATUS_OK

space = {
    'no_components': hp.choice('no_components', [64, 100]),
    'loss': hp.choice('loss', ['warp', 'warp-kos', 'bpr']),
    'learning_rate': hp.loguniform('learning_rate', np.log(1e-4), np.log(0.1)),
    'k': hp.choice('k', [10, 20])
}


In [6]:
def objective(params):
    # Initialize model with sampled hyperparameters
    model = LightFM(
        no_components=params['no_components'],
        loss=params['loss'],
        k=params['k'],
        learning_rate=params['learning_rate'],
        random_state=42
    )
    
    for _ in tqdm(range(30)):
        model.fit_partial(interactions, num_threads=20)
    
    val_recall = recall_at_k(model, rest_test, k=20, num_threads=20).mean()
    
    return {
        'loss': -val_recall,
        'status': STATUS_OK,
        'params': params,
    }

In [7]:
from hyperopt import fmin, tpe, Trials

trials = Trials()  # Track results
best_params = fmin(
    fn=objective,      # Objective function
    space=space,       # Search space
    algo=tpe.suggest,  # Optimization algorithm (Tree-structured Parzen Estimator)
    max_evals=50,      # Number of trials (increase for better results)
    trials=trials,     # Store results
    verbose=True,      # Show progress
)

print("Best hyperparameters:", best_params)

  0%|          | 0/50 [00:00<?, ?trial/s, best loss=?]

  0%|          | 0/30 [00:00<?, ?it/s]
  0%|          | 0/30 [00:09<?, ?it/s]


  0%|          | 0/50 [00:09<?, ?trial/s, best loss=?]


KeyboardInterrupt: 

In [None]:
final_model = LightFM(
    no_components=best_params['no_components'],
    loss=best_params['loss'],
    k=best_params['k'],
    user_alpha=best_params['user_alpha'],
    item_alpha=best_params['item_alpha'],
    learning_rate=best_params['learning_rate'],
    random_state=42
)

# Train longer (e.g., 100 epochs)
fit(final_model, name='tuned_model', epochs=100)