In [1]:
import os
# tame BLAS/OpenMP threaders (macOS Jupyter sometimes crashes otherwise)
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"


#### Step 8: Backend API (FastAPI)

In [2]:
# main.py
import os
import json
import pickle
from typing import List, Optional

import numpy as np
import faiss
faiss.omp_set_num_threads(1)

import torch
import torch.nn as nn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..'))) # to make the project root importable when notebook lives in here in api/
# now this works:
from models.ncf_model import NCF



In [3]:
# ---------------- Config / paths ----------------
from pathlib import Path

ROOT = Path.cwd().resolve().parents[0]   # points to MovieLens_DL/
ART_DIR = ROOT / "features_artifacts"
CKPT_DIR = ROOT / "checkpoints"
MAPPINGS_DIR = ROOT / "mappings"

# contains: emb (N, D), tmdb_id (N,)
# If you want base embeddings: 
EMB_NPZ  = ART_DIR / "movie_embeddings.npz" 
# OR if you want the finetuned embeddings:
# EMB_NPZ  = ART_DIR / "movie_embeddings_finetuned.npz"   
 
FAISS_IVF = ART_DIR / "movie_faiss_index_flat.index"
NCF_CKPT = CKPT_DIR / "ncf_finetuned_final.pt"
U2I_JSON = MAPPINGS_DIR / "u2i.json"          # {userId: u_idx}
SEEN_PKL = MAPPINGS_DIR / "user_seen_items.pkl"  # dict[u_idx] -> set(m_idx)
ID2TITLE_JSON = MAPPINGS_DIR / "id_to_title.json" # {tmdb_id: title}

# Defaults (match your 7D decisions)
LOW_HISTORY_MAX_SEEN = 20
C_FOR_LOW_USERS = 3.0
TOPK_DEFAULT = 20
SHORTLIST_M = 200

# ---------------- Utilities ----------------
def _safe_l2_normalize_rows(mat: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    mat = np.asarray(mat, dtype=np.float32, order="C")
    np.nan_to_num(mat, copy=False, nan=0.0, posinf=0.0, neginf=0.0)
    norms = np.linalg.norm(mat, axis=1, keepdims=True)
    norms = np.maximum(norms, eps)
    return mat / norms

def _safe_l2_normalize_vec(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    v = np.asarray(v, dtype=np.float32).reshape(1, -1)
    np.nan_to_num(v, copy=False, nan=0.0, posinf=0.0, neginf=0.0)
    n = np.linalg.norm(v, axis=1, keepdims=True)
    n = np.maximum(n, eps)
    return (v / n).astype(np.float32)

# ---------------- Load artifacts ----------------
npz = np.load(str(EMB_NPZ), allow_pickle=False)

# accept either set of keys
if "emb" in npz and "tmdb_id" in npz:
    emb = npz["emb"]
    ids = npz["tmdb_id"]
elif "movie_embeddings" in npz and "tmdb_ids" in npz:
    emb = npz["movie_embeddings"]
    ids = npz["tmdb_ids"]
else:
    raise ValueError(f"Unrecognized keys in {EMB_NPZ}: {list(npz.keys())}")

movie_embeddings = _safe_l2_normalize_rows(np.asarray(emb))
tmdb_ids = np.asarray(ids).astype(np.int64)

id_to_row = {int(t): int(i) for i, t in enumerate(tmdb_ids)}
# add these two lines:
num_items = movie_embeddings.shape[0]
emb_dim   = movie_embeddings.shape[1]

# id → title map (optional)
id_to_title = {}
if ID2TITLE_JSON.exists():
    with open(str(ID2TITLE_JSON), "r", encoding="utf-8") as f:
        id_to_title = {int(k): v for k, v in json.load(f).items()}

# FAISS
# FAISS (build-only in notebook to avoid write crashes)
d = movie_embeddings.shape[1]
if FAISS_IVF.exists():
    try:
        faiss_index = faiss.read_index(str(FAISS_IVF))
        if faiss_index.d != d:
            print(f"[WARN] FAISS index dim {faiss_index.d} != emb dim {d}. Rebuilding (no write).")
            faiss_index = faiss.IndexFlatIP(d)
            faiss_index.add(movie_embeddings.astype("float32"))
    except Exception as e:
        print(f"[WARN] Failed to read saved FAISS index: {e}. Rebuilding (no write).")
        faiss_index = faiss.IndexFlatIP(d)
        faiss_index.add(movie_embeddings.astype("float32"))
else:
    faiss_index = faiss.IndexFlatIP(d)
    faiss_index.add(movie_embeddings.astype("float32"))

print("FAISS ready:", faiss_index.ntotal, "vectors; dim:", d)

# user mappings
u2i = {}
if U2I_JSON.exists():
    with open(str(U2I_JSON), "r", encoding="utf-8") as f:
        tmp = json.load(f)
        # keys may be str; normalizing
        u2i = {int(k): int(v) for k, v in tmp.items()}

user_seen_items = {}
if SEEN_PKL.exists():
    with open(str(SEEN_PKL), "rb") as f:
        user_seen_items = pickle.load(f)  # dict[u_idx] -> set(m_idx)
        # normalize to sets of int
        user_seen_items = {int(u): set(int(m) for m in s) for u, s in user_seen_items.items()}

FAISS ready: 44383 vectors; dim: 128


In [4]:
# ---- Safe checkpoint load (CPU first, strict shapes, then optional GPU move) ----
import os, gc, torch
torch.set_grad_enabled(False)

# 0) Force CPU for load to avoid CUDA init crashes
device = "cuda" if torch.cuda.is_available() else "cpu"
ckpt_device = "cpu"   # always load on CPU first

# 1) Load state dict safely
state = torch.load(str(NCF_CKPT), map_location=ckpt_device)
# Some checkpoints wrap weights under 'state_dict'
if isinstance(state, dict) and "state_dict" in state:
    state = state["state_dict"]

# 2) Infer sizes from checkpoint (do NOT touch GPU yet)
n_users_ckpt = state["user_emb.weight"].shape[0]
n_items_ckpt = state["item_emb.weight"].shape[0]
emb_dim_ckpt = state["item_emb.weight"].shape[1]

# 3) Build model to EXACT checkpoint sizes, on CPU, with no external init
model = NCF(
    num_users=n_users_ckpt,
    num_items=n_items_ckpt,
    emb_dim=emb_dim_ckpt,
    init_item_vectors=None,
    freeze_items=False
).cpu()

# 4) Strict load — if this crashes, the file is likely corrupted/incompatible
missing, unexpected = model.load_state_dict(state, strict=False)
if missing or unexpected:
    print("[INFO] load_state_dict non-strict:", {"missing": missing, "unexpected": unexpected})

model.eval()

# 5) Optional: if your current movie_embeddings align in BOTH count and dim, you can override item vectors
n_items_cur, emb_dim_cur = movie_embeddings.shape
if n_items_cur == n_items_ckpt and emb_dim_cur == emb_dim_ckpt:
    with torch.no_grad():
        # replace item_emb table with your current content embeddings
        model.item_emb.weight.data.copy_(torch.from_numpy(movie_embeddings))

# 6) Optional: only now move to GPU, after everything is stable on CPU
if device == "cuda":
    model = model.to("cuda")

# 7) Guard: ensure u2i fits the checkpoint’s user table
if u2i and max(u2i.values()) >= n_users_ckpt:
    raise RuntimeError(
        f"u2i max index {max(u2i.values())} >= checkpoint user_emb size {n_users_ckpt}. "
        "Use the same split mapping you trained with or retrain."
    )

gc.collect()


40

In [5]:
# ---------------- Core helpers ----------------
def build_user_profile_mean(u_idx: int) -> Optional[np.ndarray]:
    seen = list(user_seen_items.get(u_idx, set()))
    if not seen:
        return None
    prof = movie_embeddings[seen].mean(axis=0)
    n = np.linalg.norm(prof)
    if n == 0:
        return None
    return (prof / n).astype(np.float32)

def shortlist_by_content_seed(seed_tmdb_id: int, M: int = SHORTLIST_M) -> np.ndarray:
    pos = id_to_row.get(int(seed_tmdb_id))
    if pos is None:
        raise KeyError(f"TMDB id {seed_tmdb_id} not found.")
    q = _safe_l2_normalize_vec(movie_embeddings[pos])
    sims, idxs = faiss_index.search(q, M)
    idxs = idxs[0]
    idxs = idxs[idxs != pos]  # drop self
    return idxs.astype(np.int32, copy=False)

def shortlist_by_content_for_user(u_idx: int, M: int = SHORTLIST_M) -> np.ndarray:
    prof = build_user_profile_mean(u_idx)
    seen = user_seen_items.get(u_idx, set())
    if prof is None:
        # simple fallback: first unseen M
        if len(seen) >= num_items:
            return np.array([], dtype=np.int32)
        idxs = [i for i in range(num_items) if i not in seen][:M]
        return np.array(idxs, dtype=np.int32)
    q = _safe_l2_normalize_vec(prof)
    sims, idxs = faiss_index.search(q, M)
    idxs = idxs[0]
    idxs = np.array([i for i in idxs if i not in seen], dtype=np.int32)
    return idxs

@torch.no_grad()
def ncf_score_user_array(u_idx: int, item_idxs: np.ndarray, batch: int = 4096) -> np.ndarray:
    out = []
    u_t = torch.tensor([u_idx], dtype=torch.long, device=device)
    for i in range(0, len(item_idxs), batch):
        chunk = item_idxs[i:i+batch]
        uu = u_t.repeat(len(chunk))
        mm = torch.tensor(chunk, dtype=torch.long, device=device)
        logits = model(uu, mm).detach().cpu().numpy().astype(np.float32)
        out.append(logits)
    return np.concatenate(out, axis=0)

def hybrid_for_user(u_idx: int, item_idxs: np.ndarray, C: float = C_FOR_LOW_USERS):
    # z-score normalize per user (your original variant)
    prof = build_user_profile_mean(u_idx)
    if prof is None:
        c_scores = np.zeros(len(item_idxs), dtype=np.float32)
    else:
        c_scores = (movie_embeddings[item_idxs] @ prof).astype(np.float32)
    cf_scores = ncf_score_user_array(u_idx, item_idxs)

    def z(x):
        m, s = x.mean(), x.std()
        return (x - m) / (s + 1e-9) if s > 0 else x * 0.0

    c2 = z(c_scores)
    cf2 = z(cf_scores)

    n_seen = len(user_seen_items.get(u_idx, set()))
    alpha = min(1.0, C / (C + max(0, n_seen)))  # alpha_by_num_ratings
    scores = alpha * c2 + (1.0 - alpha) * cf2
    return scores.astype(np.float32), alpha

def pack_items(item_idxs: np.ndarray, scores: Optional[np.ndarray] = None):
    out = []
    for i, idx in enumerate(item_idxs.tolist()):
        tmdb = int(tmdb_ids[idx])
        out.append({
            "tmdb_id": tmdb,
            "title": id_to_title.get(tmdb, None),
            **({"score": float(scores[i])} if scores is not None else {})
        })
    return out



In [6]:
# ---------------- FastAPI ----------------
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="Hybrid Recommender API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],          # during dev; tighten later
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class RecResponse(BaseModel):
    items: List[dict]
    alpha: Optional[float] = None
    used_shortlist_M: Optional[int] = None
    k: int

@app.get("/health")
def health():
    return {"status": "ok", "users": len(u2i), "items": int(movie_embeddings.shape[0]), "dim": int(movie_embeddings.shape[1]), "sample_user_ids": list(list(u2i.keys())[:5])}

@app.get("/recommend_by_movie", response_model=RecResponse)
def recommend_by_movie(tmdb_id: int, k: int = Query(TOPK_DEFAULT, ge=1, le=100), M: int = SHORTLIST_M):
    try:
        idxs = shortlist_by_content_seed(tmdb_id, M=M)[:k]
    except KeyError as e:
        raise HTTPException(status_code=404, detail=str(e))
    return RecResponse(items=pack_items(idxs), alpha=1.0, used_shortlist_M=M, k=k)

@app.get("/recommend_for_user", response_model=RecResponse)
def recommend_for_user(user_id: int, k: int = Query(TOPK_DEFAULT, ge=1, le=100), M: int = SHORTLIST_M):
    if user_id not in u2i:
        raise HTTPException(status_code=404, detail=f"user_id {user_id} not found")
    u_idx = u2i[user_id]
    shortlist = shortlist_by_content_for_user(u_idx, M=M)
    if shortlist.size == 0:
        return RecResponse(items=[], alpha=None, used_shortlist_M=M, k=k)
    # **Collaborative-only**: rank shortlist by NCF logits
    scores = ncf_score_user_array(u_idx, shortlist)
    order = np.argsort(-scores)[:k]
    topk = shortlist[order]
    return RecResponse(items=pack_items(topk, scores[order]), alpha=None, used_shortlist_M=M, k=k)

@app.get("/recommend_hybrid", response_model=RecResponse)
def recommend_hybrid(user_id: int, tmdb_id: Optional[int] = None,
                     k: int = Query(TOPK_DEFAULT, ge=1, le=100),
                     M: int = SHORTLIST_M, C: float = C_FOR_LOW_USERS):
    if tmdb_id is not None:
        # seed-based content path
        idxs = shortlist_by_content_seed(tmdb_id, M=M)[:k]
        return RecResponse(items=pack_items(idxs), alpha=1.0, used_shortlist_M=M, k=k)

    if user_id not in u2i:
        raise HTTPException(status_code=404, detail=f"user_id {user_id} not found")
    u_idx = u2i[user_id]
    shortlist = shortlist_by_content_for_user(u_idx, M=M)
    if shortlist.size == 0:
        return RecResponse(items=[], alpha=None, used_shortlist_M=M, k=k)
    scores, alpha = hybrid_for_user(u_idx, shortlist, C=C)
    order = np.argsort(-scores)[:k]
    topk = shortlist[order]
    return RecResponse(items=pack_items(topk, scores[order]), alpha=float(alpha), used_shortlist_M=M, k=k)


In [None]:
import threading, uvicorn, nest_asyncio
nest_asyncio.apply()

def _run():
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=False, log_level="info")

t = threading.Thread(target=_run, daemon=True)
t.start()



INFO:     Started server process [69119]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:51147 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:51147 - "GET /recommend_by_movie?tmdb_id=862&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:51147 - "GET /recommend_for_user?user_id=1&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:51147 - "GET /recommend_hybrid?user_id=1&k=12&M=200&C=3.0 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52386 - "GET /recommend_by_movie?tmdb_id=862&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52386 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:52386 - "GET /recommend_for_user?user_id=1&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52386 - "GET /recommend_hybrid?user_id=1&k=12&M=200&C=3.0 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52406 - "GET /health HTTP/1.1" 200 OK
INFO:     127.0.0.1:52406 - "GET /recommend_by_movie?tmdb_id=862&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52406 - "GET /recommend_for_user?user_id=1&k=12 HTTP/1.1" 200 OK
INFO:     127.0.0.1:52406 - "GET /recommend_hybrid?user_id=1&k=12&M=200&C=3.0 HTTP/1.1" 200 OK
INFO:     127.0.0.1:54123