In [None]:
#| default_exp api

# API

> cli and web APIs for the service

In [None]:
#|hide
from fastdownload import FastDownload

In [None]:
#|export
from collections import deque
import logging as l
from fastcore.all import *
from hits_recsys.collab import *
from pathlib import Path
from fastapi import FastAPI
from pydantic import BaseModel
from importlib import metadata
import uvicorn
from datetime import date
from hits_recsys.embed import EmbedAdapter

## Logging

In [None]:
# |export
DEF_FMT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'

def init_logger(name: str = None, level=l.INFO, format: str = None, handlers: list = None, logs_dir='./logs'):
    '''Initializes a logger, adds handlers and sets the format. If logs_dir is provided, a file handler is added to the logger.'''
    handlers = ifnone(handlers, [])
    handlers.append(l.StreamHandler())
    if logs_dir: 
        p = Path(logs_dir)/f'{date.today()}.log'
        p.parent.mkdir(parents=True, exist_ok=True)
        handlers.append(l.FileHandler(p)) 
    log_fmt = l.Formatter(ifnone(format, DEF_FMT), datefmt='%Y-%m-%d %H:%M:%S')
    log = l.getLogger(name)
    log.setLevel(level)
    log.handlers.clear()
    for h in handlers: h.setFormatter(log_fmt); log.addHandler(h)

In [None]:
#|export
class LoggingQueue(deque):
    '''deque with `logging.Handler` api methods'''
    def put_nowait(self, rec): self.append(rec.message)

In [None]:
q = LoggingQueue([],3)
init_logger(handlers=[l.handlers.QueueHandler(q)])

In [None]:
l.info("test 1")
l.info("test 2")
l.info("test 3")

2024-03-16 16:21:04 - root - INFO - test 1
2024-03-16 16:21:04 - root - INFO - test 2
2024-03-16 16:21:04 - root - INFO - test 3


In [None]:
L(q).pprint()

2024-03-16 16:21:04 - root - INFO - test 1
2024-03-16 16:21:04 - root - INFO - test 2
2024-03-16 16:21:04 - root - INFO - test 3


## CLI

In [None]:
#|export

MODEL_CLASS ={'collab': CollabUserBased, 'embed': EmbedAdapter}

@call_parse
def cli(optype, # operation to peroform, one of 'train', 'eval' or 'pred'
        r_path, # path to dataset with ratings
        m_path,  # path to dataset with movie titles
        model_type: str = 'collab', # type of model to train, one of `collab`, `embed`
        model: Path=None, # path to model if not train
        out: Path = './models'):  # folder for output model, by default will save to './models'
    
    assert optype in ['train','eval','pred'], 'incorrect operation type'
    init_logger()
    
    if model: 
        l.info(f"Loading model from {model}")
        serv = ModelService.load(model, MODEL_CLASS[model_type]())
    
    l.info(f"loading datasets from {r_path} and {m_path}")
    ds = TfmdDataset(read_movielens(r_path,m_path))
    l.info(f"datasets loaded")

    l.info(f"start operation: {optype}")
    if optype=='train':
        serv = ModelService(MODEL_CLASS[model_type](), ds)
        serv.train()
        l.info(f"model trained")
        serv.save(out)
        l.info(f"model saved to {out}")
    elif not serv.model:
        l.error("You are trying to run model without providing correct model path")
    if optype=='eval':
        loss = serv.eval(ds)
        l.info(f"loss = {loss.item()}")
    if optype=='pred':
        res = serv.pred(ds)
        with open(out, 'w') as f:
            f.writelines([f"{line}\n" for line in res])
        l.info(f"preds are saved to {out}")

In [None]:
url = 'https://raw.githubusercontent.com/MenshikovDmitry/TSU_AI_Course/main/module_1.%20Recommender%2BDevOps/dataset/'
files = ('ratings_train.dat ratings_test.dat movies.dat users.dat').split()
d = FastDownload()

In [None]:
paths = L(d.download(url+f) for f in files); paths

(#4) [Path('/home/slakter/.fastdownload/archive/ratings_train.dat'),Path('/home/slakter/.fastdownload/archive/ratings_test.dat'),Path('/home/slakter/.fastdownload/archive/movies.dat'),Path('/home/slakter/.fastdownload/archive/users.dat')]

In [None]:
cli('train', paths[0],paths[2], out='../models')

2024-03-16 16:21:05 - root - INFO - loading datasets from /home/slakter/.fastdownload/archive/ratings_train.dat and /home/slakter/.fastdownload/archive/movies.dat
2024-03-16 16:21:09 - root - INFO - datasets loaded
2024-03-16 16:21:09 - root - INFO - start operation: train
2024-03-16 16:21:10 - root - INFO - model trained
2024-03-16 16:21:10 - root - INFO - model saved to ../models


In [None]:
cli('pred', paths[1], paths[2], '../models', './out.txt')

2024-03-16 16:21:10 - root - INFO - Loading model from ../models
2024-03-16 16:21:10 - root - INFO - loading datasets from /home/slakter/.fastdownload/archive/ratings_test.dat and /home/slakter/.fastdownload/archive/movies.dat
2024-03-16 16:21:10 - root - INFO - datasets loaded
2024-03-16 16:21:10 - root - INFO - start operation: pred


2024-03-16 16:21:11 - root - INFO - preds are saved to ./out.txt


## Web server

In [None]:
#|export
class PredictRequest(BaseModel):
    '''Request for prediction'''
    movie_names: list
    ratings: list

In [None]:
#|export

def add_routes(app, serv):
    @app.get("/api/predict")
    async def predict(body: PredictRequest):
        return serv.recommend(body.movie_names, body.ratings, 20)

    @app.post("/api/reload")
    async def reload(): 
        serv.load(app.location)
        l.info("model reloaded")

    @app.get("/api/similar")
    async def similar(movie_name: str):
        l.info(f"getting similar movies to {movie_name}")
        try:
            return serv.similar_movies(movie_name)
        except KeyError:
            return {"error": f"Movie {movie_name} not found"}
    
    @app.get("/api/movies")
    async def movies(prefix:str, page:int=0):
        l = [m for m in serv.ds.movie_map if m.startswith(prefix)]
        return l[min(20*page,len(l)):min(20*(page+1),len(l))]
    
    @app.get("/api/info")
    async def info():
        return dict(metadata.metadata('hits-recsys'))

In [None]:
#|export
def add_logging(app, q): 
    @app.get("/api/log")
    async def log(page: int = -1, n_logs: int = 20):
        logs = list(q)
        tail = (page+1)*n_logs
        return {'logs': logs[max(page*n_logs,-len(logs)) : None if tail<=0 else tail]}

In [None]:
#|export
@call_parse
def serve(host='127.0.0.1',
          port=5000, # port to listen on
          model_type: str = 'collab', # type of model to train, one of `collab`, `embed`
          model_dir='./models', # directory to load model from
          logs_dir='./logs'): # logs directory
    
    q = LoggingQueue([], 20)
    init_logger(handlers=[l.handlers.QueueHandler(q)], logs_dir=logs_dir)
    app = FastAPI()
    serv = ModelService.load(model_dir, MODEL_CLASS[model_type]())
    if not serv.model: 
          l.error("You are trying to run model without providing correct model path! Shutting down...")
          return
    app.location = model_dir
    add_routes(app, serv)
    add_logging(app,q)
    serv.save(model_dir)
    if in_notebook(): 
          import nest_asyncio
          nest_asyncio.apply()
    cfg = uvicorn.Config(app, host=host, port=port, log_config=None)
    server = uvicorn.Server(cfg)
    server.run()

In [None]:
#|eval: false
serve(port=5000, model_dir='../models')

2024-03-16 16:24:39 - uvicorn.error - INFO - Started server process [57908]
2024-03-16 16:24:39 - uvicorn.error - INFO - Waiting for application startup.
2024-03-16 16:24:39 - uvicorn.error - INFO - Application startup complete.
2024-03-16 16:24:39 - uvicorn.error - INFO - Uvicorn running on http://127.0.0.1:5000 (Press CTRL+C to quit)
2024-03-16 16:24:41 - uvicorn.access - INFO - 127.0.0.1:50096 - "GET /api/info HTTP/1.1" 200
2024-03-16 16:24:42 - uvicorn.access - INFO - 127.0.0.1:50106 - "GET /api/predict HTTP/1.1" 200
2024-03-16 16:24:42 - uvicorn.access - INFO - 127.0.0.1:50116 - "GET /api/log HTTP/1.1" 200
2024-03-16 16:24:43 - root - INFO - model reloaded
2024-03-16 16:24:43 - uvicorn.access - INFO - 127.0.0.1:50120 - "POST /api/reload HTTP/1.1" 200
2024-03-16 16:24:44 - root - INFO - getting similar movies to Star Trek III: The Search for Spock (1984)
2024-03-16 16:24:44 - uvicorn.access - INFO - 127.0.0.1:36992 - "GET /api/similar?movie_name=Star%20Trek%20III:%20The%20Search%20