# Kaggle InsightFace Remote Embedding Service
This notebook sets up a FastAPI server providing remote embedding & similarity operations using a GPU (2x T4) environment. You can tunnel the service to your local machine and keep raw video data local.

Steps:
1. Install dependencies (skip if already present).
2. (Optional) Enable Cloudflare tunnel for external access.
3. Launch FastAPI server (non-blocking).
4. Test endpoints from within notebook.
5. Copy tunnel URL & call from local client scripts.

Security:
Use a bearer token stored in `SERVICE_TOKEN` environment variable. Change the default.


In [None]:
# Install / upgrade dependencies (Kaggle often has some preinstalled)
%pip install -q --upgrade pip
%pip install -q fastapi uvicorn insightface opencv-python-headless pillow numpy cloudpickle

import os, subprocess, sys, json, math, time
print('Python version:', sys.version)


In [None]:
# Environment configuration
import os
SERVICE_TOKEN = os.environ.get('SERVICE_TOKEN', 'changeme')  # CHANGE THIS
MODEL_PACK = os.environ.get('MODEL_PACK', 'buffalo_l')
DET_SIZE = tuple(int(x) for x in os.environ.get('DET_SIZE', '640,640').split(','))
print({'SERVICE_TOKEN': SERVICE_TOKEN, 'MODEL_PACK': MODEL_PACK, 'DET_SIZE': DET_SIZE})


In [None]:
# Load model (warm-up) to verify GPU access
import insightface, time
from pprint import pprint

providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
print('Attempting to load FaceAnalysis with providers:', providers)
face_app = insightface.app.FaceAnalysis(name=MODEL_PACK, providers=providers)
face_app.prepare(ctx_id=0, det_size=DET_SIZE)
print('Model loaded. Providers actually used:', face_app.providers)


In [None]:
# FastAPI server definition (in-notebook)
import base64, numpy as np
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from typing import List, Optional
from PIL import Image
from io import BytesIO
import uvicorn, threading

app = FastAPI(title='InsightFace Remote Embedding Service', version='0.1.0')

class EmbedRequest(BaseModel):
    images: List[str]
class EmbedResponse(BaseModel):
    embeddings: List[List[float]]
    count: int
class SimilarityRequest(BaseModel):
    emb_a: List[float]
    emb_b: List[float]
class SimilarityResponse(BaseModel):
    similarity: float
class PingResponse(BaseModel):
    model_pack: str
    det_size: List[int]
    provider_list: List[str]

SERVICE_TOKEN = SERVICE_TOKEN

def auth_check(authorization: Optional[str] = Header(None)):
    if not SERVICE_TOKEN:
        return
    if not authorization or not authorization.startswith('Bearer '):
        raise HTTPException(status_code=401, detail='Missing bearer token')
    if authorization.split(' ',1)[1] != SERVICE_TOKEN:
        raise HTTPException(status_code=403, detail='Invalid token')

def decode_image(b64_str: str):
    raw = base64.b64decode(b64_str)
    im = Image.open(BytesIO(raw)).convert('RGB')
    import numpy as _np
    return _np.array(im)[:, :, ::-1]

@app.post('/ping', response_model=PingResponse)
async def ping(_: None = Depends(auth_check)):
    return PingResponse(model_pack=MODEL_PACK, det_size=list(DET_SIZE), provider_list=face_app.providers)

@app.post('/embed', response_model=EmbedResponse)
async def embed(req: EmbedRequest, _: None = Depends(auth_check)):
    out = []
    for img_b64 in req.images:
        img = decode_image(img_b64)
        faces = face_app.get(img)
        if not faces:
            out.append([])
        else:
            out.append(faces[0].normed_embedding.tolist())
    return EmbedResponse(embeddings=out, count=len(out))

@app.post('/similarity', response_model=SimilarityResponse)
async def similarity(req: SimilarityRequest, _: None = Depends(auth_check)):
    import numpy as _np
    a = _np.array(req.emb_a, dtype='float32')
    b = _np.array(req.emb_b, dtype='float32')
    sim = float((_np.dot(a, b) / (_np.linalg.norm(a) * _np.linalg.norm(b))))
    return SimilarityResponse(similarity=sim)

# Launch server in background thread

def run_server():
    uvicorn.run(app, host='0.0.0.0', port=8000, log_level='warning')

thread = threading.Thread(target=run_server, daemon=True)
thread.start()
print('Server thread started on port 8000.')


In [None]:
# (Optional) Cloudflare tunnel
# Kaggle usually allows outbound apt operations; if blocked skip this.
import subprocess, textwrap, shutil, os, json, time
if shutil.which('cloudflared') is None:
    print('Installing cloudflared...')
    !wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 -O cloudflared
    !chmod +x cloudflared
    !mv cloudflared /usr/local/bin/
else:
    print('cloudflared already installed')

TUNNEL_URL = None
try:
    proc = subprocess.Popen(['cloudflared','tunnel','--url','http://localhost:8000','--no-autoupdate'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    print('Starting tunnel (wait ~5-10s)...')
    import threading
    lines = []
    def reader():
        global TUNNEL_URL
        for line in proc.stdout:
            if 'trycloudflare.com' in line and 'https://' in line:
                start = line.find('https://')
                url = line[start:].strip()
                if ' ' in url:
                    url = url.split(' ',1)[0]
                TUNNEL_URL = url
            print(line, end='')
    t = threading.Thread(target=reader, daemon=True)
    t.start()
    # wait some seconds for URL
    for _ in range(30):
        if TUNNEL_URL:
            break
        time.sleep(1)
    if TUNNEL_URL:
        print('Tunnel URL:', TUNNEL_URL)
    else:
        print('Tunnel URL not captured yet; check logs above.')
except Exception as e:
    print('Tunnel setup failed:', e)


In [None]:
# Quick local test of /ping endpoint
import requests, json, os
BASE='http://localhost:8000'
headers={'Authorization': f'Bearer {SERVICE_TOKEN}'} if SERVICE_TOKEN else {}
resp = requests.post(BASE+'/ping', headers=headers)
print(resp.status_code, resp.text)


In [None]:
# Sample embed test using a tiny placeholder image (generate solid color)
import numpy as np, base64, requests
from PIL import Image
from io import BytesIO
arr = (np.ones((112,112,3))*127).astype('uint8')
im = Image.fromarray(arr)
buf = BytesIO(); im.save(buf, format='JPEG'); b64=base64.b64encode(buf.getvalue()).decode()
headers={'Authorization': f'Bearer {SERVICE_TOKEN}'} if SERVICE_TOKEN else {}
resp = requests.post('http://localhost:8000/embed', json={'images':[b64]}, headers=headers)
print(resp.status_code, resp.json())


## Usage From Local Machine

1. Run all cells above until tunnel URL appears (if using Cloudflare). Suppose it is `https://xyz.trycloudflare.com`.
2. On local machine export token:
   ```bash
   export SERVICE_TOKEN=yourtoken
   ```
3. Test client:
   ```bash
   python kaggle_pipeline/client/remote_similarity_cli.py --url https://xyz.trycloudflare.com \
       --ref assets/raw_face.webp --query assets/person1.webp
   ```
4. Integrate by modifying local detection loop to call remote embedding for each detected face instead of local model inference (optional optimization if GPU needed).


In [None]:
# Patch: add GET / route (if not already) and keep-alive pinger
try:
    from fastapi import APIRouter
    @app.get('/')
    def root():
        return {"status":"ok","msg":"Use POST /ping /embed /similarity"}
    print('Root route added.')
except Exception as e:
    print('Root route patch skipped:', e)

# Simple keep-alive thread to prevent idle timeouts (hits local /ping every 50s)
import threading, time, requests, os
if 'KEEPALIVE_STARTED' not in globals():
    def _keepalive():
        while True:
            try:
                h = {'Authorization': f'Bearer {SERVICE_TOKEN}'} if SERVICE_TOKEN else {}
                requests.post('http://127.0.0.1:8000/ping', headers=h, timeout=5)
            except Exception:
                pass
            time.sleep(50)
    threading.Thread(target=_keepalive, daemon=True).start()
    KEEPALIVE_STARTED = True
    print('Keep-alive thread started.')
else:
    print('Keep-alive already running.')
