<a href="https://colab.research.google.com/github/KAMAL0657/KAMAL-HUSSAIN/blob/main/reco_monitoring_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recommendation Monitoring & Maintenance Demo (Colab-ready)
This single Colab notebook simulates a recommendation service and demonstrates a monitoring + maintenance loop:
- Train a simple PyTorch classifier to predict click probability (serves as "recommender score").
- Start a FastAPI server that returns predictions and logs audit events.
- Simulate production traffic, including **feature drift** and a **new user segment** appearing over time.
- Continuously compute business metrics (CTR), detect distributional drift (KS test and JS divergence), and raise alerts.
- When alerts fire, automatically **retrain** the model on recent data, run a **shadow / canary** evaluation, and decide whether to promote or rollback.


Run all cells in order. The notebook uses CPU-only libraries and runs servers locally inside Colab.


In [12]:
# Install required packages
!pip install -q torch torchvision --extra-index-url https://download.pytorch.org/whl/cpu
!pip install -q fastapi uvicorn[standard] httpx nest-asyncio scikit-learn scipy numpy pandas matplotlib



=== ALERTS TRIGGERED === [('FEATURE_DRIFT_HIGH', 2.0)] ctr=1.000 js=0.000 drift_mean=2.000

=== ALERTS TRIGGERED === [('FEATURE_DRIFT_HIGH', 2.0)] ctr=1.000 js=0.000 drift_mean=2.000


In [13]:
# Create synthetic training data and train a simple PyTorch logistic model
import torch, torch.nn as nn, numpy as np, random, os, json
from sklearn.model_selection import train_test_split
import math

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

def generate_users(n, new_segment=False, drift_level=0.0):
    # features: [age, recency, activity_score, feature_drifted]
    ages = np.random.normal(35 + (5 if new_segment else 0), 10, size=n)  # new segment older
    recency = np.random.exponential(scale=10.0, size=n)  # days since last
    activity = np.random.beta(2,5,size=n) * 10
    # drifted feature: baseline centered at 0, but drift shifts mean
    drifted = np.random.normal(loc=drift_level, scale=1.0, size=n)
    X = np.vstack([ages, recency, activity, drifted]).T.astype(np.float32)
    # click probability simulated
    logits = -0.05*(ages-30) - 0.1*np.log1p(recency) + 0.4*activity + 1.5*drifted
    probs = 1/(1+np.exp(-logits/10.0))  # scale down to reasonable probs
    y = (np.random.rand(n) < probs).astype(np.float32)
    return X, y, probs

# initial training data (no new segment, no drift)
X0, y0, p0 = generate_users(20000, new_segment=False, drift_level=0.0)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().fit(X0)
X0s = scaler.transform(X0)

# simple PyTorch model (small MLP)
class SimpleReco(nn.Module):
    def __init__(self, in_dim=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, 16),
            nn.ReLU(),
            nn.Linear(16, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.net(x).squeeze(-1)

model = SimpleReco(4)
criterion = nn.BCELoss()
opt = torch.optim.Adam(model.parameters(), lr=0.01)

# train
Xtrain, Xval, ytrain, yval = train_test_split(X0s, y0, test_size=0.2, random_state=SEED)
Xtrain_t = torch.tensor(Xtrain, dtype=torch.float32)
ytrain_t = torch.tensor(ytrain, dtype=torch.float32)
Xval_t = torch.tensor(Xval, dtype=torch.float32)
yval_t = torch.tensor(yval, dtype=torch.float32)

for epoch in range(10):
    model.train()
    preds = model(Xtrain_t)
    loss = criterion(preds, ytrain_t)
    opt.zero_grad(); loss.backward(); opt.step()
    model.eval()
    with torch.no_grad():
        v = model(Xval_t).numpy()
        val_loss = ((v - yval_t.numpy())**2).mean()
    if epoch%2==0:
        print(f'epoch {epoch} train_loss={loss.item():.4f} val_mse={val_loss:.5f}')

# save model and scaler
os.makedirs('/content/reco_model', exist_ok=True)
torch.jit.save(torch.jit.trace(model, torch.randn(1,4)), '/content/reco_model/model.ts')
import pickle
with open('/content/reco_model/scaler.pkl','wb') as f:
    pickle.dump(scaler, f)

print('Initial model and scaler saved to /content/reco_model')

epoch 0 train_loss=0.6971 val_mse=0.24952
epoch 2 train_loss=0.6938 val_mse=0.24857
epoch 4 train_loss=0.6919 val_mse=0.24815
epoch 6 train_loss=0.6909 val_mse=0.24805
epoch 8 train_loss=0.6905 val_mse=0.24813
Initial model and scaler saved to /content/reco_model


In [14]:
# Start a FastAPI server that serves /predict and logs audit events to a local file.
import threading, nest_asyncio, uvicorn, json, time, uuid, hashlib, pickle
from fastapi import FastAPI
from pydantic import BaseModel
import torch, numpy as np, os, logging
nest_asyncio.apply()

MODEL_PATH = '/content/reco_model/model.ts'
SCALER_PATH = '/content/reco_model/scaler.pkl'
AUDIT_LOG = '/content/reco_audit.log'

model = torch.jit.load(MODEL_PATH, map_location='cpu')
model.eval()
with open(SCALER_PATH,'rb') as f:
    scaler = pickle.load(f)

app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('reco-api')

class Req(BaseModel):
    features: list
    user_segment: str = 'existing'  # 'existing' or 'new'

def hash_input(x):
    return hashlib.sha256(json.dumps(x).encode()).hexdigest()[:16]

@app.post('/predict')
async def predict(req: Req):
    req_id = str(uuid.uuid4())
    t0 = time.time()
    x = np.array(req.features, dtype=float).reshape(1,-1)
    xs = scaler.transform(x)
    xt = torch.tensor(xs, dtype=torch.float32)
    with torch.no_grad():
        score = float(model(xt).item())
    latency_ms = int((time.time()-t0)*1000)
    label = 1 if score>0.5 else 0
    # write audit (append-only)
    audit = {'request_id': req_id, 'ts': time.time(), 'features_hash': hash_input(req.features),
             'user_segment': req.user_segment, 'score': score, 'served_model': 'baseline'}
    with open(AUDIT_LOG,'a') as f:
        f.write(json.dumps(audit)+'\n')
    return {'score':score, 'request_id': req_id, 'latency_ms': latency_ms}

def run_server():
    uvicorn.run(app, host='127.0.0.1', port=8000, log_level='info')

threading.Thread(target=run_server, daemon=True).start()
print('FastAPI server running at http://127.0.0.1:8000')

FastAPI server running at http://127.0.0.1:8000


INFO:     Started server process [157]
INFO:     Waiting for application startup.
INFO:     Application startup complete.


In [15]:
# Simulate traffic in a background task. We'll progressively introduce drift and a new user segment.
import asyncio, httpx, random, time, threading, json, math, os
SIM_LOG = '/content/sim_metrics.jsonl'
if os.path.exists(SIM_LOG):
    os.remove(SIM_LOG)

async def hit_once(client, features, segment='existing'):
    try:
        r = await client.post('http://127.0.0.1:8000/predict', json={'features':features.tolist(), 'user_segment': segment}, timeout=5.0)
        if r.status_code==200:
            return True, r.json()['score']
        return False, None
    except Exception as e:
        return False, None

def stream_traffic(duration_seconds=60, qps=50, drift_schedule=None):
    """Simulate traffic for duration_seconds at approx qps.
    drift_schedule: list of tuples (time_offset_seconds, drift_level, pct_new_segment)
    """
    import asyncio, time, httpx, numpy as np
    start = time.time()
    loop = asyncio.new_event_loop(); asyncio.set_event_loop(loop)
    async def runner():
        async with httpx.AsyncClient() as client:
            next_drift_idx = 0
            drift_level = 0.0
            pct_new = 0.0
            while time.time() - start < duration_seconds:
                t = time.time() - start
                # update drift/segment according to schedule
                if drift_schedule and next_drift_idx < len(drift_schedule) and t >= drift_schedule[next_drift_idx][0]:
                    _, drift_level, pct_new = drift_schedule[next_drift_idx]
                    next_drift_idx += 1
                tasks = []
                for i in range(qps):
                    # decide if new user
                    if random.random() < pct_new:
                        seg=True
                    else:
                        seg=False
                    # generate features reflecting drift and segment
                    ages = np.random.normal(35 + (5 if seg else 0), 10)
                    recency = np.random.exponential(scale=10.0)
                    activity = np.random.beta(2,5)*10
                    drifted = np.random.normal(loc=drift_level, scale=1.0)
                    features = np.array([ages, recency, activity, drifted], dtype=float)
                    tasks.append(asyncio.create_task(hit_once(client, features, 'new' if seg else 'existing')))
                results = await asyncio.gather(*tasks)
                # log metrics of this second
                succ = sum(1 for s,_ in results if s)
                avg_score = sum((sc for s,sc in results if s)) / max(1, succ)
                with open(SIM_LOG,'a') as f:
                    f.write(json.dumps({'t': time.time(), 'succ': succ, 'qps': qps, 'avg_score': avg_score, 'drift': drift_level, 'pct_new': pct_new})+'\n')
                await asyncio.sleep(1.0)
    loop.run_until_complete(runner())

# Run simulator in background thread with scheduled drift events
schedule = [
    (0, 0.0, 0.0),        # start: no drift, no new segment
    (15, 0.5, 0.0),       # after 15s, small drift begins
    (30, 1.2, 0.05),      # after 30s, stronger drift + 5% new segment
    (45, 1.5, 0.15),      # after 45s, more drift + 15% new users
    (60, 2.0, 0.30)       # after 60s, heavy drift + 30% new users
]

threading.Thread(target=stream_traffic, args=(90, 30, schedule), daemon=True).start()
print('Traffic simulator started (runs ~90s). Check /content/sim_metrics.jsonl and /content/reco_audit.log')

ERROR:    [Errno 98] error while attempting to bind on address ('127.0.0.1', 8000): address already in use


Traffic simulator started (runs ~90s). Check /content/sim_metrics.jsonl and /content/reco_audit.log


INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.


In [16]:
# Monitoring loop: periodically read simulator and audit logs, compute CTR and drift metrics, and trigger actions.
import time, json, threading, os, math
from scipy import stats
from collections import deque, defaultdict
import numpy as np

ALERTS = []
ACTION_LOG = []

def read_sim_metrics():
    path = '/content/sim_metrics.jsonl'
    if not os.path.exists(path): return []
    with open(path) as f:
        lines = [json.loads(l) for l in f if l.strip()]
    return lines

def read_audit_logs():
    path = '/content/reco_audit.log'
    if not os.path.exists(path): return []
    with open(path) as f:
        lines = [json.loads(l) for l in f if l.strip()]
    return lines

# utility: compute JS divergence between two histograms
def js_divergence(p, q):
    # p, q are arrays (prob masses) summing to 1
    p = np.array(p); q = np.array(q)
    m = 0.5*(p+q)
    def kl(a,b):
        a = np.where(a==0, 1e-12, a)
        b = np.where(b==0, 1e-12, b)
        return np.sum(a * np.log(a/b))
    return 0.5*(kl(p,m) + kl(q,m))

# baseline distribution (from training predictions)
# collect training scores histogram
train_scores = []
with open('/content/reco_audit.log','w') as f: f.write('')  # clear audit at start
# generate baseline scores by passing training set through model
import pickle, torch, numpy as np
scaler = pickle.load(open('/content/reco_model/scaler.pkl','rb'))
model = torch.jit.load('/content/reco_model/model.ts', map_location='cpu')
model.eval()
X_train = np.vstack([np.random.normal(35,10,1000), np.random.exponential(10,1000),
                     np.random.beta(2,5,1000)*10, np.random.normal(0,1,1000)]).T
X_train_s = scaler.transform(X_train)
with torch.no_grad():
    import torch as _t
    s = model(_t.tensor(X_train_s,dtype=_t.float32)).numpy()
train_scores = s.tolist()
hist_bins = np.linspace(0,1,21)
train_hist, _ = np.histogram(train_scores, bins=hist_bins)
train_hist = train_hist / train_hist.sum()

# monitoring thread
def monitor_loop(duration=95, check_interval=5):
    start = time.time()
    triggered_retrain = False
    while time.time() - start < duration:
        time.sleep(check_interval)
        sim = read_sim_metrics()
        audit = read_audit_logs()
        # compute recent CTR approximation using simulator metrics succ/qps per second
        if sim:
            last10 = sim[-10:]
            total_requests = sum(s['qps'] for s in last10)
            total_succ = sum(s['succ'] for s in last10)
            ctr = total_succ / max(1, total_requests)
        else:
            ctr = 0.0
        # compute current score histogram from audit logs (last N)
        scores = [e['score'] for e in audit[-200:]] if audit else []
        if scores:
            cur_hist, _ = np.histogram(scores, bins=hist_bins)
            if cur_hist.sum()>0:
                cur_hist = cur_hist / cur_hist.sum()
                js = js_divergence(train_hist, cur_hist)
            else:
                js = 0.0
        else:
            js = 0.0
        # univariate drift check on drifted feature by sampling recent generated features via sim metrics (we logged drift level)
        drift_levels = [s['drift'] for s in sim[-20:]] if sim else []
        drift_mean = np.mean(drift_levels) if drift_levels else 0.0
        # simple alert conditions
        alerts_now = []
        if ctr < 0.20:  # example threshold
            alerts_now.append(('CTR_DROP', ctr))
        if js > 0.15:
            alerts_now.append(('PRED_DIST_SHIFT_JS', js))
        if drift_mean > 1.0:
            alerts_now.append(('FEATURE_DRIFT_HIGH', float(drift_mean)))
        if alerts_now:
            ALERTS.append({'t': time.time(), 'alerts': alerts_now, 'ctr': ctr, 'js': js, 'drift_mean': drift_mean})
            print('\n=== ALERTS TRIGGERED ===', alerts_now, f'ctr={ctr:.3f}', f'js={js:.3f}', f'drift_mean={drift_mean:.3f}')
        else:
            print(f'no alerts. ctr={ctr:.3f}, js={js:.3f}, drift_mean={drift_mean:.3f}')
        # If persistent alerts and not yet triggered retrain, trigger retrain
        if alerts_now and not triggered_retrain:
            print('Triggering automated retrain based on alerts...')
            ACTION_LOG.append({'t':time.time(),'action':'trigger_retrain','reason':alerts_now})
            triggered_retrain = True
            # perform retrain in background
            threading.Thread(target=retrain_and_canary, daemon=True).start()
    print('\nMonitoring finished. Alerts collected:', len(ALERTS))

# We'll define retrain_and_canary next (but monitor will use it)
def retrain_and_canary():
    print('\n[Retrain] collecting recent labeled data and retraining model...')
    # For demo, we'll synthesize new training data emphasizing recent drift and new segment
    import numpy as np, torch, pickle, random
    # generate data with drift and new segment presence
    X_new, y_new, _ = [], [], []
    for i in range(10000):
        if random.random() < 0.25:
            # new segment
            X,y,_p = generate_users(1, new_segment=True, drift_level=1.5)
        else:
            X,y,_p = generate_users(1, new_segment=False, drift_level=1.0)
        X_new.append(X[0]); y_new.append(y[0])
    X_new = np.vstack(X_new).astype(np.float32)
    y_new = np.array(y_new).astype(np.float32)
    # combine with a fraction of old data for stability
    X_comb = np.vstack([X0[:5000], X_new])
    y_comb = np.concatenate([y0[:5000], y_new])
    # scale with original scaler refit
    from sklearn.preprocessing import StandardScaler
    scaler2 = StandardScaler().fit(X_comb)
    Xs = scaler2.transform(X_comb)
    # train new PyTorch model
    import torch.nn as nn, torch.optim as optim
    model_new = SimpleReco(4)
    opt = optim.Adam(model_new.parameters(), lr=0.01)
    Xt = torch.tensor(Xs, dtype=torch.float32)
    yt = torch.tensor(y_comb, dtype=torch.float32)
    for epoch in range(8):
        model_new.train()
        preds = model_new(Xt)
        loss = nn.BCELoss()(preds, yt)
        opt.zero_grad(); loss.backward(); opt.step()
    # save new model and scaler to /content/reco_model_new
    import os, pickle, torch as _t
    os.makedirs('/content/reco_model_new', exist_ok=True)
    _t.jit.save(_t.jit.trace(model_new, _t.randn(1,4)), '/content/reco_model_new/model.ts')
    with open('/content/reco_model_new/scaler.pkl','wb') as f:
        pickle.dump(scaler2, f)
    print('[Retrain] new model saved. Running shadow evaluation...')
    # shadow: evaluate new model vs baseline on a recent sample of requests (simulate using current drift levels)
    # load baseline and new model
    baseline = torch.jit.load('/content/reco_model/model.ts', map_location='cpu')
    baseline.eval()
    newm = _t.jit.load('/content/reco_model_new/model.ts', map_location='cpu')
    newm.eval()
    # sample 2000 recent users with drift and new segment presence
    import numpy as np, random
    Xsample, ysample = generate_users(2000, new_segment=False, drift_level=1.2)
    # 20% new segment inserted
    n = Xsample.shape[0]
    idx_new = np.random.choice(n, size=int(0.2*n), replace=False)
    for i in idx_new:
        Xsample[i], _, _ = generate_users(1, new_segment=True, drift_level=1.5)
    # scale via scaler2 to run both through comparable features for new model, but baseline expects original scaler;
    # to be fair, transform with original scaler for baseline and scaler2 for new model.
    s_baseline = pickle.load(open('/content/reco_model/scaler.pkl','rb'))
    Xb = s_baseline.transform(Xsample)
    Xn = scaler2.transform(Xsample)
    import torch as _t
    with torch.no_grad():
        sb = baseline(_t.tensor(Xb, dtype=_t.float32)).numpy()
        sn = newm(_t.tensor(Xn, dtype=_t.float32)).numpy()
    ctr_baseline = (sb>0.5).mean()  # proxy: fraction of positives
    ctr_new = (sn>0.5).mean()
    print(f'[Shadow Eval] baseline positive-rate(proxy)={ctr_baseline:.3f} new={ctr_new:.3f}')
    ACTION_LOG.append({'t':time.time(),'action':'shadow_eval','baseline_pos':float(ctr_baseline),'new_pos':float(ctr_new)})
    # Simple canary decision: choose model with higher positive rate on recent sample
    promote = ctr_new >= ctr_baseline
    if promote:
        print('[Canary] New model looks better in shadow — promoting to canary traffic (simulated)...')
        ACTION_LOG.append({'t':time.time(),'action':'promote_canary'})
        # simulate canary: route 10% traffic to new model for 30s and compare observed CTR (proxy)
        baseline_ctr_obs, new_ctr_obs = simulate_canary(duration=30, pct_new=0.1, model_new='/content/reco_model_new/model.ts', scaler_new='/content/reco_model_new/scaler.pkl')
        print(f'[Canary] observed ctr baseline={baseline_ctr_obs:.3f}, new={new_ctr_obs:.3f}')
        ACTION_LOG.append({'t':time.time(),'action':'canary_result','base':baseline_ctr_obs,'new':new_ctr_obs})
        # decision rule
        if new_ctr_obs >= baseline_ctr_obs:
            print('[Canary] Accepting new model — swap production model (for demo: replace baseline files)')
            import shutil, os
            shutil.rmtree('/content/reco_model')
            os.rename('/content/reco_model_new','/content/reco_model')
            ACTION_LOG.append({'t':time.time(),'action':'promote_final'})
        else:
            print('[Canary] Canary underperformed — rollback and keep baseline.')
            ACTION_LOG.append({'t':time.time(),'action':'rollback'})

    else:
        print('[Shadow Eval] New model did not improve — aborting promotion.')
        ACTION_LOG.append({'t':time.time(),'action':'abort_promote'})

# simulate_canary function used above
def simulate_canary(duration=30, pct_new=0.1, model_new=None, scaler_new=None):
    # run a short traffic sim where pct_new of traffic goes to new model; compare observed positive rates (proxy CTR)
    import time, random, numpy as np, pickle, torch
    start = time.time()
    t_end = start + duration
    # load models
    baseline = torch.jit.load('/content/reco_model/model.ts', map_location='cpu'); baseline.eval()
    if model_new:
        newm = torch.jit.load(model_new, map_location='cpu'); newm.eval()
        s_new = pickle.load(open(scaler_new,'rb'))
    s_baseline = pickle.load(open('/content/reco_model/scaler.pkl','rb'))
    counts = {'base_pos':0,'base_tot':0,'new_pos':0,'new_tot':0}
    while time.time() < t_end:
        # generate a small batch per second
        for _ in range(50):
            seg = random.random() < 0.25
            if seg:
                X,_,_ = generate_users(1, new_segment=True, drift_level=1.5)
            else:
                X,_,_ = generate_users(1, new_segment=False, drift_level=1.2)
            # decide routing
            if random.random() < pct_new:
                # to new model
                Xn = s_new.transform(X)
                with torch.no_grad():
                    s = float(newm(torch.tensor(Xn,dtype=torch.float32)).item())
                counts['new_tot'] += 1
                counts['new_pos'] += int(s>0.5)
            else:
                Xb = s_baseline.transform(X)
                with torch.no_grad():
                    s = float(baseline(torch.tensor(Xb,dtype=torch.float32)).item())
                counts['base_tot'] += 1
                counts['base_pos'] += int(s>0.5)
        time.sleep(1)
    base_ctr = counts['base_pos']/max(1,counts['base_tot'])
    new_ctr = counts['new_pos']/max(1,counts['new_tot'])
    return base_ctr, new_ctr

# start monitor thread
threading.Thread(target=monitor_loop, kwargs={'duration':100,'check_interval':5}, daemon=True).start()
print('Monitoring started (will run ~100s).')

Monitoring started (will run ~100s).


## After running the notebook
- Watch stdout to see alerts, retrain, shadow evaluation, and canary results.
- Files produced:
  - `/content/reco_audit.log` : per-request audit entries (JSONL)
  - `/content/sim_metrics.jsonl` : traffic simulator per-second metrics
  - `/content/reco_model/` : current production model files (TorchScript + scaler)
  - `/content/reco_model_new/` : retrained candidate (if produced)
  - ACTION_LOG and ALERTS are in memory; view them by running cells to print them.


### Helpful commands
- Tail logs: `!tail -n 50 /content/reco_audit.log`
- See simulator metrics: `!tail -n 50 /content/sim_metrics.jsonl`
- List models: `!ls -la /content/reco_model*`

This demo is simplified but illustrates the core monitoring → detect → retrain → canary → promote/rollback loop you can run in production.
