## Домашнее задание №4 (курс "Практикум по программированию на языке Python")

### Выполнил: <font color='red'>Никитин Артем Анатольевич, МФТИ, ФПМИ</font>

### Тема: Web-сервер для обучения и использования ML-моделей

#### Преподаватели: Роман Ищенко (roman.ischenko@gmail.com) и Мурат Апишев (mel-lain@yandex.ru)

**Выдана**: 30.04.2024

**Дедлайн**: 13.05.2024

**Среда выполнения**: Jupyter Notebook (Python 3.7+)

#### Правила:

Результаты выполнения задания:

- архив со скриптами и файлами Dockerfile, который 1-2 команды позволяет развернуть сервер, решающий поставленные в задании задачи
- Jupyter Notebook, где __весь код__ из скриптов дублируется (1 ячейка - 1 скрипт) с комментарием, содержащим информацию о том, из какого файла взят код и что верхнеуровнево этот код делает

__Максимальное число баллов за задание - 20__.

Готовое задание отправляется на почту преподавателя.

Задание выполняется самостоятельно. Если какие-то студенты будут уличены в списывании, все они автоматически получат за эту работу 0 баллов. Если вы нашли в Интернете какой-то специфичный код, который собираетесь заимствовать, обязательно укажите это в задании - наверняка вы не единственный, кто найдёт и использует эту информацию.

Удалять фрагменты формулировок заданий запрещается.

### Постановка задачи:

**Серверная часть (14 баллов):**

- В данной работе нужно написать многозадачный веб-сервер для обучения и инференса ML моделей. На старте сервер получает на вход (через .env) конфиг, в котором должны быть указаны 3 параметра: путь к директории для сохранения моделей внутри контейнера сервера, число ядер, доступных для обучения и максимальное число моделей, которые могут быть одновременно загружены для инференса.


- Сервер должен реализовывать следующие методы:
    - `fit(X, y, config)` - обучить модель и сохранить на диск по указанным именем
    - `predict(y, config)` - предсказать с помощью обученной и загруженной модели по её имени
    - `load(config)` - загрузить обученную модель по её имени в режим инференса
    - `unload(config)` - выгрузить загруженную модель по её имени
    - `remove(config)` - удалить обученную модель с диска по её имени
    - `remove_all()` - удалить все обученные модели с диска


- Содержимое конфигов и форматы данных предлагается продумать и реализовать самостоятельно
- Сервер должен иметь счётчик активных процессов. Максимальное число активных процессов соответствует числу ядер, переданному в конфиге при старте сервиса. Каждое обучение модели запускается в отдельном процессе и до своего завершения потребляет этот процесс. Один процесс всегда остаётся для сервера, в нём же загружаются и работают на инференс обученные модели
- Сервер должен корректно обрабатывать все граничные случаи (запуск обучения без свободных ядер, запуск инфренса свыше лимита, запросы с несуществующими именами моделей, запросы с дублирующимися именами моделей)
- В реализации должны поддерживаться не менее трёх дискриминативных моделей (т.е. принимающих на вход объекты и метки при обучении и предсказывающих метки для новых объектов)
- Сервер должен быть реализован на FastAPI
- Проект разворачивается с помощью выбранной библиотеки управления виртуальными окружениями (pipenv, poetry)
- Дополнительным плюсом будет использование технологии контейнеризации Docker

**Клиентская часть (6 баллов):**

- Клиентская часть должна демонстрировать работу с реализованным сервером с помощью библиотек requests и aiohttp. Она может быть реализована непосредственно в Jupyter Notebook, с описанием ожидаемого действия, или в отдельном(-ых) скрипте(-ах), с дублированием в Jupyter Notebook (тогда работоспособность в ноутбуке не требуется). Далее описываются отдельные функции:
- Код вызова последовательного вызова обучения как минимум двух (N) различных моделей с таким набором данных и параметрами, чтобы обучение одной модели длилось не менее 60 секунд.
- Код вызова асинхронного вызова обучения как минимум двух различных моделей с демонстрацией, что работа выполняется в два (в N) раза быстрее
- Асинхронный вызов нескольких предсказаний
- Код демонстрации остальных функций сервера (загрузка, выгрузка, удаление)
- Должны обрабатываться ошибки и исключения, возвращаемые сервером


# Запускать согласно `README`, код сервера снизу вставлен чисто из-за пункта в задании (в нем не требовалось, чтобы он запускался). Для проверки работы есть код в конце ноутбука с клиентской частью, а так же в папке `client`!

In [None]:
# Loading configs from file


from pathlib import Path
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_dir: Path
    num_cores: int
    max_loaded_models: int

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
    )


settings = Settings()


In [None]:
# Core functions: for training, for saving (in another process), for loading/unloading, deleting. I use Lock for calculating the number of available cores through the processes.

import os
import threading
import joblib

from multiprocessing import get_context
from typing import Dict

ctx = get_context("fork")
Process = ctx.Process

_active_trains = ctx.Value('i', 0)
_active_lock = ctx.Lock()

_loaded_models: Dict[str, object] = {}
_loaded_lock = threading.Lock()


def _train_and_save(model_name: str, model_cls, X, y, hyperparams: dict):
    try:
        model = model_cls(**hyperparams)
        model.fit(X, y)
        path = os.path.join(settings.model_dir, f"{model_name}.joblib")
        joblib.dump(model, path)
    finally:
        with _active_lock:
            _active_trains.value -= 1


def start_training(model_name: str, model_type: str, X, y, hyperparams: dict):
    path = os.path.join(settings.model_dir, f"{model_name}.joblib")
    if os.path.exists(path):
        raise FileExistsError(f"Model '{model_name}' already exists")

    with _active_lock:
        if _active_trains.value >= settings.num_cores:
            raise RuntimeError("No free cores for training")
        _active_trains.value += 1

    from sklearn.linear_model import LogisticRegression
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.svm import SVC

    cls_map = {
        "logreg": LogisticRegression,
        "rf": RandomForestClassifier,
        "svm": SVC,
    }
    model_cls = cls_map.get(model_type)
    if model_cls is None:
        with _active_lock:
            _active_trains.value -= 1
        raise ValueError(f"Unknown model type '{model_type}'")

    proc = Process(
        target=_train_and_save,
        args=(model_name, model_cls, X, y, hyperparams),
    )
    proc.start()
    return proc.pid


def available_cores() -> int:
    with _active_lock:
        return settings.num_cores - _active_trains.value


def load_model(model_name: str):
    with _loaded_lock:
        if model_name in _loaded_models:
            return
        if len(_loaded_models) >= settings.max_loaded_models:
            raise RuntimeError("Loaded models limit reached")
        path = os.path.join(settings.model_dir, f"{model_name}.joblib")
        if not os.path.exists(path):
            raise FileNotFoundError(f"Model '{model_name}' not found on disk")
        _loaded_models[model_name] = joblib.load(path)


def unload_model(model_name: str):
    with _loaded_lock:
        if model_name not in _loaded_models:
            raise KeyError(f"Model '{model_name}' is not loaded")
        del _loaded_models[model_name]


def predict(model_name: str, X):
    with _loaded_lock:
        if model_name not in _loaded_models:
            raise KeyError(f"Model '{model_name}' is not loaded")
        return _loaded_models[model_name].predict(X).tolist()


def remove_model(model_name: str):
    with _loaded_lock:
        _loaded_models.pop(model_name, None)
    path = os.path.join(settings.model_dir, f"{model_name}.joblib")
    if not os.path.exists(path):
        raise FileNotFoundError(f"Model '{model_name}' not found on disk")
    os.remove(path)


def remove_all():
    with _loaded_lock:
        _loaded_models.clear()
    for fname in os.listdir(settings.model_dir):
        if fname.endswith(".joblib"):
            os.remove(os.path.join(settings.model_dir, fname))


In [None]:
# Request types and schemas

from pydantic import BaseModel
from typing import List, Optional, Dict


class ModelConfig(BaseModel):
    model_name: str
    model_type: str
    hyperparams: Optional[Dict] = {}


class FitRequest(BaseModel):
    X: List[List[float]]
    y: List[int]
    config: ModelConfig


class FitResponse(BaseModel):
    status: str
    pid: int


class LoadRequest(BaseModel):
    model_name: str


class UnloadRequest(LoadRequest): pass


class RemoveRequest(LoadRequest): pass


class GenericResponse(BaseModel):
    status: str


class PredictRequest(BaseModel):
    model_name: str
    X: List[List[float]]


class PredictResponse(BaseModel):
    predictions: List[int]


In [None]:
# API realization

from fastapi import FastAPI, HTTPException

app = FastAPI()


@app.post("/fit", response_model=FitResponse)
def fit(req: FitRequest):
    try:
        pid = start_training(
            req.config.model_name,
            req.config.model_type,
            req.X, req.y, req.config.hyperparams or {}
        )
    except FileExistsError as e:
        raise HTTPException(400, str(e))
    except RuntimeError as e:
        raise HTTPException(429, str(e))
    return FitResponse(status="training_started", pid=pid)


@app.post("/load", response_model=GenericResponse)
def load(req: LoadRequest):
    try:
        load_model(req.model_name)
    except FileNotFoundError:
        raise HTTPException(404, "Model not found")
    except RuntimeError as e:
        raise HTTPException(429, str(e))
    return GenericResponse(status="loaded")


@app.post("/unload", response_model=GenericResponse)
def unload(req: UnloadRequest):
    try:
        unload_model(req.model_name)
    except KeyError:
        raise HTTPException(404, "Model not loaded")
    return GenericResponse(status="unloaded")


@app.post("/predict", response_model=PredictResponse)
def do_predict(req: PredictRequest):
    try:
        preds = predict(req.model_name, req.X)
    except KeyError:
        raise HTTPException(404, "Model not loaded")
    return PredictResponse(predictions=preds)


@app.post("/remove", response_model=GenericResponse)
def remove(req: RemoveRequest):
    try:
        remove_model(req.model_name)
    except FileNotFoundError:
        raise HTTPException(404, "Model not found")
    return GenericResponse(status="removed")


@app.post("/remove_all", response_model=GenericResponse)
def removeall():
    remove_all()
    return GenericResponse(status="all_removed")


@app.get("/cores")
def get_cores():
    return {
        "free_cores": available_cores(),
        "total_cores": settings.num_cores
    }


# Запускать отсюда после запуска сервера!

In [1]:
import requests
import time
import asyncio
import aiohttp
import numpy as np

URL = "http://localhost:8000"

In [12]:
resp = requests.post(f"{URL}/remove_all")
print("remove_all:", resp.json())

remove_all: {'status': 'all_removed'}


In [13]:
def wait_for_model(name, timeout=3600, poll=5):
    import os

    start = time.time()
    path = f"./app/models/{name}.joblib"
    while time.time() - start < timeout:
        if os.path.exists(path):
            print(f"Model {name} is ready")
            return True
        time.sleep(poll)
    raise TimeoutError(f"Model {name} not ready in {timeout}s")


def train_model(name, model_type):
    X = np.random.randn(10000, 50).tolist()
    y = np.random.randint(0, 2, size=10000).tolist()
    payload = {"X": X, "y": y, "config": {"model_name": name, "model_type": model_type}}
    resp = requests.post(f"{URL}/fit", json=payload)
    print(name, resp.json())
    if resp.status_code == 200:
        wait_for_model(name)


start = time.time()
train_model("m1", "rf")
train_model("m2", "svm")
train_model("m3", "logreg")
train_model("m4", "svm")
train_model("m5", "svm")
print("Sequential total:", time.time() - start)

m1 {'status': 'training_started', 'pid': 19479}
Current working directory: /home/ntheme/Data1/Workfiles/Programming/Projects/MIPT/Machine Learning/Intelligent Systems/Python/server
Model m1 is ready
m2 {'status': 'training_started', 'pid': 19517}
Current working directory: /home/ntheme/Data1/Workfiles/Programming/Projects/MIPT/Machine Learning/Intelligent Systems/Python/server
Model m2 is ready
m3 {'status': 'training_started', 'pid': 19534}
Current working directory: /home/ntheme/Data1/Workfiles/Programming/Projects/MIPT/Machine Learning/Intelligent Systems/Python/server
Model m3 is ready
m4 {'status': 'training_started', 'pid': 19577}
Current working directory: /home/ntheme/Data1/Workfiles/Programming/Projects/MIPT/Machine Learning/Intelligent Systems/Python/server
Model m4 is ready
m5 {'status': 'training_started', 'pid': 19589}
Current working directory: /home/ntheme/Data1/Workfiles/Programming/Projects/MIPT/Machine Learning/Intelligent Systems/Python/server
Model m5 is ready
Seque

In [14]:
async def train(session, name, model_type):
    X = np.random.randn(10000, 50).tolist()
    y = np.random.randint(0, 2, size=10000).tolist()
    payload = {"X": X, "y": y, "config": {"model_name": name, "model_type": model_type}}
    async with session.post(f"{URL}/fit", json=payload) as resp:
        print(name, await resp.json())


async def async_training():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(
            train(session, "a1", "rf"),
            train(session, "a2", "svm"),
            train(session, "a3", "logreg"),
            train(session, "a4", "svm"),
            train(session, "a5", "svm"),
        )
    print("Async total:", time.time() - start)


await async_training()

a2 {'status': 'training_started', 'pid': 19638}
a5 {'status': 'training_started', 'pid': 19639}
a4 {'status': 'training_started', 'pid': 19640}
a3 {'status': 'training_started', 'pid': 19642}
a1 {'status': 'training_started', 'pid': 19670}
Async total: 1.581207513809204


In [15]:
print("load m1:", requests.post(f"{URL}/load", json={"model_name": "m1"}).json())
print("predict m1:", requests.post(f"{URL}/predict", json={"model_name": "m1", "X": [[0.1] * 50]}).json())
print("unload m1:", requests.post(f"{URL}/unload", json={"model_name": "m1"}).json())
print("remove m1:", requests.post(f"{URL}/remove", json={"model_name": "m1"}).json())
print("remove_all:", requests.post(f"{URL}/remove_all").json())

load m1: {'status': 'loaded'}
predict m1: {'predictions': [0]}
unload m1: {'status': 'unloaded'}
remove m1: {'status': 'removed'}
remove_all: {'status': 'all_removed'}
