## Домашнее задание №4 (курс "Python 1")

### Выполнил: <font color='red'>Каленюк Аким Александрович</font>

### Тема: Web-сервер для обучения и использования ML-моделей

#### Преподаватели: Роман Ищенко (roman.ischenko@gmail.com) и Илья Склонин

**Дедлайн**: 18.01.2026

**Среда выполнения**: Jupyter Notebook (Python 3.9+)

#### Правила:

Результаты выполнения задания:

- архив со скриптами и файлами Dockerfile, который 1-2 команды позволяет развернуть сервер, решающий поставленные в задании задачи
- Jupyter Notebook, где __весь код__ из скриптов дублируется (1 ячейка - 1 скрипт) с комментарием, содержащим информацию о том, из какого файла взят код и что верхнеуровнево этот код делает

__Максимальное число баллов за задание - 35__.

Готовое задание отправляется на почту преподавателя.

Задание выполняется самостоятельно. Если какие-то студенты будут уличены в списывании, все они автоматически получат за эту работу 0 баллов. Если вы нашли в Интернете какой-то специфичный код, который собираетесь заимствовать, обязательно укажите это в задании - наверняка вы не единственный, кто найдёт и использует эту информацию.

Удалять фрагменты формулировок заданий запрещается.

### Постановка задачи:

**Серверная часть (22 балла):**

- В данной работе нужно написать многозадачный веб-сервер для обучения и инференса ML моделей. На старте сервер получает на вход (через .env) конфиг, в котором должны быть указаны 3 параметра: путь к директории для сохранения моделей внутри контейнера сервера, число ядер, доступных для обучения и максимальное число моделей, которые могут быть одновременно загружены для инференса.


- Сервер должен реализовывать следующие методы:
    - `fit(X, y, config)` - обучить модель и сохранить на диск по указанным именем
    - `predict(y, config)` - предсказать с помощью обученной и загруженной модели по её имени
    - `load(config)` - загрузить обученную модель по её имени в режим инференса
    - `unload(config)` - выгрузить загруженную модель по её имени
    - `remove(config)` - удалить обученную модель с диска по её имени
    - `remove_all()` - удалить все обученные модели с диска


- Содержимое конфигов и форматы данных предлагается продумать и реализовать самостоятельно
- Сервер должен иметь счётчик активных процессов. Максимальное число активных процессов соответствует числу ядер, переданному в конфиге при старте сервиса. Каждое обучение модели запускается в отдельном процессе и до своего завершения потребляет этот процесс. Один процесс всегда остаётся для сервера, в нём же загружаются и работают на инференс обученные модели
- Сервер должен корректно обрабатывать все граничные случаи (запуск обучения без свободных ядер, запуск инфренса свыше лимита, запросы с несуществующими именами моделей, запросы с дублирующимися именами моделей)
- В реализации должны поддерживаться не менее трёх дискриминативных моделей (т.е. принимающих на вход объекты и метки при обучении и предсказывающих метки для новых объектов)
- Сервер должен быть реализован на FastAPI
- Проект разворачивается с помощью выбранной библиотеки управления виртуальными окружениями и технологии контейнеризации Docker

**Клиентская часть (13 баллов):**

- Клиентская часть должна демонстрировать работу с реализованным сервером с помощью библиотек requests и aiohttp. Она может быть реализована непосредственно в Jupyter Notebook, с описанием ожидаемого действия, или в отдельном(-ых) скрипте(-ах), с дублированием в Jupyter Notebook (тогда работоспособность в ноутбуке не требуется). Далее описываются отдельные функции:
- Код вызова последовательного вызова обучения как минимум двух (N) различных моделей с таким набором данных и параметрами, чтобы обучение одной модели длилось не менее 60 секунд.
- Код вызова асинхронного вызова обучения как минимум двух различных моделей с демонстрацией, что работа выполняется в два (в N) раза быстрее
- Асинхронный вызов нескольких предсказаний
- Код демонстрации остальных функций сервера (загрузка, выгрузка, удаление)
- Должны обрабатываться ошибки и исключения, возвращаемые сервером


# Поработаем с сервером

In [18]:
import requests
import aiohttp
import asyncio
import time
import numpy as np
from typing import List, Dict, Any
import json

BASE_URL = "http://localhost:8000"

def check_server_status():
    """Проверка статуса сервера"""
    response = requests.get(f"{BASE_URL}/")
    print("Статус сервера:", response.json())

def get_server_status():
    """Получение подробного статуса"""
    response = requests.get(f"{BASE_URL}/status")
    return response.json()

## Последовательное обучение

In [19]:
np.random.seed(42)
n_samples = 10000
n_features = 100

X = np.random.randn(n_samples, n_features)
y = np.random.randint(0, 2, n_samples)

In [20]:
models = [
        {
            "name": "model_logreg_large",
            "type": "logreg",
            "params": {"max_iter": 5000, "random_state": 42}
        },
        {
            "name": "model_rf_large",
            "type": "rf",
            "params": {"n_estimators": 500, "max_depth": 20, "random_state": 42}
        }
    ]

In [21]:
start_time = time.time()

for model_info in models:
    print(f"\nОбучение {model_info['name']}...")
    
    fit_request = {
        "model_name": model_info["name"],
        "model_type": model_info["type"],
        "features": X.tolist(),
        "labels": y.tolist(),
        "params": model_info["params"]
    }
    
    response = requests.post(
        f"{BASE_URL}/fit",
        json=fit_request,
        headers={"Content-Type": "application/json"}
    )
    
    if response.status_code == 200:
        print(f"Запущено: {response.json()['message']}")
        
        while True:
            status_response = requests.get(
                f"{BASE_URL}/fit/{model_info['name']}/status"
            )
            status_data = status_response.json()
            print(status_data)
            if status_data['status'] == 'success':
                training_time = status_data.get('training_time', 0)
                print(f"Обучение завершено за {training_time:.2f} секунд")
                break
            elif status_data['status'] == 'error':
                print(f"Ошибка: {status_data}")
                break
            else:
                print(".", end="", flush=True)
                time.sleep(5)
    else:
        print(f"Ошибка: {response.status_code} - {response.text}")

sequential_total_time = time.time() - start_time
print(f"\nОбщее время последовательного обучения: {sequential_total_time:.2f} секунд")


Обучение model_logreg_large...
Запущено: Обучение модели 'model_logreg_large' запущено
{'status': 'training', 'model_name': 'model_logreg_large'}
{'status': 'success', 'model_name': 'model_logreg_large', 'training_time': 0.0471346378326416}
Обучение завершено за 0.05 секунд

Обучение model_rf_large...
Запущено: Обучение модели 'model_rf_large' запущено
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'training', 'model_name': 'model_rf_large'}
{'status': 'success', 'model_name': 'model_rf_large', 'training_time': 32.807024002075195}
Обучение завершено за 32.81 секунд

Общее время последовательного обучения: 41.34 секунд


## Асинхронное обучение

In [22]:
np.random.seed(42)
n_samples = 10000
n_features = 100

X = np.random.randn(n_samples, n_features)
y = np.random.randint(0, 2, n_samples)

# Модели для обучения
models = [
    {
        "name": "async_model_svm",
        "type": "svm",
        "params": {"kernel": "rbf", "random_state": 42}
    },
    {
        "name": "async_model_rf",
        "type": "rf",
        "params": {"n_estimators": 300, "random_state": 42}
    }
]

In [23]:
async def train_model_async(session, model_info):
    """Асинхронное обучение одной модели"""
    fit_request = {
        "model_name": model_info["name"],
        "model_type": model_info["type"],
        "features": X.tolist(),
        "labels": y.tolist(),
        "params": model_info["params"]
    }
    
    async with session.post(
        f"{BASE_URL}/fit",
        json=fit_request
    ) as response:
        if response.status == 200:
            print(f"Запущено обучение: {model_info['name']}")
            
            while True:
                async with session.get(
                    f"{BASE_URL}/fit/{model_info['name']}/status"
                ) as status_response:
                    status_data = await status_response.json()
                    
                    if status_data['status'] == 'success':
                        training_time = status_data.get('training_time', 0)
                        print(f"Модель {model_info['name']} обучена за {training_time:.2f}с")
                        return training_time
                    elif status_data['status'] == 'error':
                        print(f"Ошибка при обучении {model_info['name']}: {status_data}")
                        return None
                    else:
                        await asyncio.sleep(5)
        else:
            print(f"Ошибка запуска {model_info['name']}: {await response.text()}")
            return None

In [24]:
start_time = time.time()
    
async with aiohttp.ClientSession() as session:
    tasks = [train_model_async(session, model_info) for model_info in models]
    results = await asyncio.gather(*tasks)

total_time = time.time() - start_time
print(f"\nОбщее время асинхронного обучения: {total_time:.2f} секунд")

Запущено обучение: async_model_rf
Запущено обучение: async_model_svm
Модель async_model_svm обучена за 17.72с
Модель async_model_rf обучена за 21.62с

Общее время асинхронного обучения: 22.53 секунд


## Асинхронные предсказания

In [25]:
models_to_load = ["model_logreg_large", "model_rf_large"]
    
for model_name in models_to_load:
    load_request = {"model_name": model_name}
    response = requests.post(
        f"{BASE_URL}/load",
        json=load_request
    )
    if response.status_code == 200:
        print(f"Модель {model_name} загружена")
    elif "already loaded" not in response.text.lower():
        print(f"Предупреждение: {response.text}")

Модель model_logreg_large загружена
Модель model_rf_large загружена


In [26]:
np.random.seed(123)
n_predictions = 100
n_features = 100

X_test = np.random.randn(n_predictions, n_features).tolist()

In [27]:
async def predict_async(session, model_name, data):
    """Асинхронное предсказание"""
    predict_request = {
        "model_name": model_name,
        "features": data
    }
    
    async with session.post(
        f"{BASE_URL}/predict",
        json=predict_request
    ) as response:
        if response.status == 200:
            result = await response.json()
            return {
                "model": model_name,
                "predictions": result["predictions"][:5],  
                "status": "success"
            }
        else:
            return {
                "model": model_name,
                "error": await response.text(),
                "status": "error"
            }

In [28]:
start_time = time.time()
    
async with aiohttp.ClientSession() as session:
    tasks = []
    for i in range(10):  
        model_name = models_to_load[i % len(models_to_load)]
        task = predict_async(session, model_name, X_test)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)

total_time = time.time() - start_time

print(f"\nВыполнено {len(results)} асинхронных предсказаний")
print(f"Общее время: {total_time:.2f} секунд")


Выполнено 10 асинхронных предсказаний
Общее время: 0.31 секунд


In [30]:
for i, result in enumerate(results[:3]):
    print(f"\nРезультат {i+1}:")
    print(f"  Модель: {result['model']}")
    if result['status'] == 'success':
        print(f"  Пример предсказаний: {result['predictions']}")
    else:
        print(f"  Ошибка: {result.get('error', 'Unknown')}")


Результат 1:
  Модель: model_logreg_large
  Пример предсказаний: [0, 0, 0, 0, 1]

Результат 2:
  Модель: model_rf_large
  Пример предсказаний: [0, 1, 1, 1, 1]

Результат 3:
  Модель: model_logreg_large
  Пример предсказаний: [0, 0, 0, 0, 1]


## Остальные функции

In [32]:
print("1. Список моделей:")
response = requests.get(f"{BASE_URL}/models")
if response.status_code == 200:
    models = response.json()
    print(f"   Всего моделей: {models['count']}")
    for model in models['models']:
        print(f"   - {model['name']} (загружена: {model['loaded']})")

print("\n2. Загрузка модели:")
load_request = {"model_name": "model_logreg_large"}
response = requests.post(f"{BASE_URL}/load", json=load_request)
print(f"   Результат: {response.json()['message']}")

status = get_server_status()
print(f"   Загружено моделей: {status['loaded_models']}")

print("\n3. Выгрузка модели:")
unload_request = {"model_name": "model_logreg_large"}
response = requests.post(f"{BASE_URL}/unload", json=unload_request)
print(f"   Результат: {response.json()['message']}")

print("\n4. Удаление модели:")
remove_request = {"model_name": "async_model_svm"}
response = requests.delete(f"{BASE_URL}/remove", json=remove_request)
print(f"   Результат: {response.json()['message']}")

1. Список моделей:
   Всего моделей: 4
   - model_rf_large (загружена: True)
   - async_model_rf (загружена: False)
   - model_logreg_large (загружена: True)
   - async_model_svm (загружена: False)

2. Загрузка модели:
   Результат: Модель 'model_logreg_large' загружена в память
   Загружено моделей: 2

3. Выгрузка модели:
   Результат: Модель 'model_logreg_large' выгружена из памяти

4. Удаление модели:
   Результат: Модель 'async_model_svm' удалена с диска


## Обработка ошибок

In [35]:
print("1. Обучение с существующим именем:")
fit_request = {
    "model_name": "model_logreg_large",
    "model_type": "logreg",
    "features": [[1, 2], [3, 4]],
    "labels": [0, 1]
}
response = requests.post(f"{BASE_URL}/fit", json=fit_request)
print(f"   Код: {response.status_code}")
print(f"   Ответ: {response.json()}")

print("\n2. Предсказание с несуществующей моделью:")
predict_request = {
    "model_name": "non_existent_model",
    "features": [[1, 2], [3, 4]]
}
response = requests.post(f"{BASE_URL}/predict", json=predict_request)
print(f"   Код: {response.status_code}")
print(f"   Ответ: {response.json()}")

1. Обучение с существующим именем:
   Код: 400
   Ответ: {'detail': "Модель с именем 'model_logreg_large' уже существует"}

2. Предсказание с несуществующей моделью:
   Код: 404
   Ответ: {'detail': "Модель 'non_existent_model' не найдена"}


# Описание скриптов
## app/config.py

Что делает этот код:

* Определяет настройки приложения через класс Settings, наследующийся от BaseSettings из pydantic-settings
* Загружает конфигурацию из переменных окружения или файла .env
* Содержит параметры: директория для моделей, количество CPU ядер, максимальное число загруженных моделей

In [None]:
import os
from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    """Настройки приложения из переменных окружения"""
    models_dir: str = "./saved_models"
    cpu_cores: int = 4
    max_loaded_models: int = 3
    
    class Config:
        env_file = ".env"

settings = Settings()

## app/schemas.py

* Определяет Pydantic-схемы для валидации входных данных и формирования ответов API
* FitRequest: структура запроса для обучения модели
* PredictRequest: структура запроса для предсказаний
* ModelConfig: базовая конфигурация модели для операций загрузки/выгрузки
* Ответные схемы (FitResponse, PredictResponse, StatusResponse) для стандартизации ответов сервера

In [None]:
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import numpy as np

class FitRequest(BaseModel):
    """Схема для запроса обучения модели"""
    model_name: str = Field(..., description="Название модели")
    model_type: str = Field(..., description="Тип модели (logreg, rf, svm)")
    features: List[List[float]] = Field(..., description="Признаки")
    labels: List[int] = Field(..., description="Метки")
    params: Optional[Dict[str, Any]] = Field(default={}, description="Параметры модели")
    
    model_config = {
        "protected_namespaces": ()
    }

class PredictRequest(BaseModel):
    """Схема для запроса предсказания"""
    model_name: str = Field(..., description="Название модели")
    features: List[List[float]] = Field(..., description="Признаки для предсказания")
    
    model_config = {
        "protected_namespaces": ()
    }

class ModelConfig(BaseModel):
    """Схема для операций с моделями"""
    model_name: str = Field(..., description="Название модели")
    
    model_config = {
        "protected_namespaces": ()
    }

class FitResponse(BaseModel):
    """Ответ на обучение модели"""
    status: str
    model_name: str
    message: str
    training_time: Optional[float] = None

class PredictResponse(BaseModel):
    """Ответ на предсказание"""
    status: str
    predictions: List[int]
    probabilities: Optional[List[List[float]]] = None

class StatusResponse(BaseModel):
    """Общий ответ"""
    status: str
    message: str
    details: Optional[Dict[str, Any]] = None

## app/model_manager.py

* Реализует класс ModelManager для управления ML-моделями
* Поддерживает три типа моделей: Logistic Regression, Random Forest, SVM
* Обеспечивает обучение, сохранение, загрузку, выгрузку и удаление моделей
* Управляет кэшированием загруженных моделей в памяти
* Обрабатывает масштабирование признаков для SVM
* Предоставляет методы для предсказаний и управления жизненным циклом моделей

In [None]:
import pickle
import os
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
import numpy as np
from typing import Dict, Any, Tuple
import time

class ModelManager:
    """Менеджер для работы с ML моделями"""
    
    def __init__(self, models_dir: str):
        self.models_dir = models_dir
        os.makedirs(models_dir, exist_ok=True)
        
        self.model_types = {
            'logreg': LogisticRegression,
            'rf': RandomForestClassifier,
            'svm': SVC
        }
        
        self.loaded_models: Dict[str, Any] = {}
        self.scalers: Dict[str, StandardScaler] = {}
        
        self.default_params = {
            'logreg': {'max_iter': 1000, 'random_state': 42},
            'rf': {'n_estimators': 100, 'random_state': 42},
            'svm': {'kernel': 'rbf', 'random_state': 42, 'probability': True}
        }
    
    def create_model(self, model_type: str, params: Dict[str, Any]) -> Any:
        """Создание модели по типу и параметрам"""
        if model_type not in self.model_types:
            raise ValueError(f"Неизвестный тип модели: {model_type}")
        
        default = self.default_params.get(model_type, {})
        merged_params = {**default, **params}
        
        return self.model_types[model_type](**merged_params)
    
    def train_model(self, model_name: str, model_type: str, 
                   X: np.ndarray, y: np.ndarray, params: Dict[str, Any]) -> float:
        """Обучение модели и сохранение на диск"""
        start_time = time.time()
        
        model = self.create_model(model_type, params)
        
        if model_type == 'svm':
            scaler = StandardScaler()
            X_scaled = scaler.fit_transform(X)
            self.scalers[model_name] = scaler
        else:
            X_scaled = X
        
        model.fit(X_scaled, y)
        
        model_path = os.path.join(self.models_dir, f"{model_name}.pkl")
        with open(model_path, 'wb') as f:
            pickle.dump({
                'model': model,
                'model_type': model_type,
                'params': params
            }, f)
        
        training_time = time.time() - start_time
        return training_time
    
    def load_model(self, model_name: str) -> bool:
        """Загрузка модели в память"""
        model_path = os.path.join(self.models_dir, f"{model_name}.pkl")
        
        if not os.path.exists(model_path):
            return False
        
        with open(model_path, 'rb') as f:
            model_data = pickle.load(f)
        
        self.loaded_models[model_name] = model_data
        return True
    
    def unload_model(self, model_name: str) -> bool:
        """Выгрузка модели из памяти"""
        if model_name in self.loaded_models:
            del self.loaded_models[model_name]
            if model_name in self.scalers:
                del self.scalers[model_name]
            return True
        return False
    
    def predict(self, model_name: str, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Предсказание с помощью загруженной модели"""
        if model_name not in self.loaded_models:
            raise ValueError(f"Модель {model_name} не загружена")
        
        model_data = self.loaded_models[model_name]
        model = model_data['model']
        
        if model_name in self.scalers:
            X_scaled = self.scalers[model_name].transform(X)
        else:
            X_scaled = X
        
        predictions = model.predict(X_scaled)
        
        probabilities = None
        if hasattr(model, 'predict_proba'):
            probabilities = model.predict_proba(X_scaled)
        
        return predictions, probabilities
    
    def delete_model(self, model_name: str) -> bool:
        """Удаление модели с диска"""
        model_path = os.path.join(self.models_dir, f"{model_name}.pkl")
        
        self.unload_model(model_name)
        
        if os.path.exists(model_path):
            os.remove(model_path)
            return True
        return False
    
    def delete_all_models(self) -> int:
        """Удаление всех моделей"""
        count = 0
        for filename in os.listdir(self.models_dir):
            if filename.endswith('.pkl'):
                os.remove(os.path.join(self.models_dir, filename))
                count += 1
        
        self.loaded_models.clear()
        self.scalers.clear()
        
        return count
    
    def get_loaded_models_count(self) -> int:
        """Количество загруженных моделей"""
        return len(self.loaded_models)
    
    def model_exists(self, model_name: str) -> bool:
        """Проверка существования модели"""
        model_path = os.path.join(self.models_dir, f"{model_name}.pkl")
        return os.path.exists(model_path)

## app/process_manager.py

* Реализует класс ProcessManager для управления параллельным обучением моделей
* Использует модуль multiprocessing для создания отдельных процессов обучения
* Ограничивает количество одновременно обучающихся моделей согласно настройкам CPU
* Управляет очередями для обмена данными между процессами
* Отслеживает статус выполнения процессов обучения
* Предоставляет методы для проверки доступности ресурсов и очистки завершенных процессов

In [None]:
import multiprocessing as mp
from multiprocessing import Process, Queue
import time
from typing import Dict, Any, Optional
import numpy as np

class ProcessManager:
    """Менеджер для управления процессами обучения"""
    
    def __init__(self, max_processes: int, models_dir: str):
        from app.model_manager import ModelManager
        
        self.max_processes = max_processes
        self.model_manager = ModelManager(models_dir)
        self.active_processes: Dict[str, Process] = {}
        self.process_queues: Dict[str, Queue] = {}
        self.results: Dict[str, Dict[str, Any]] = {}
    
    def _train_worker(self, queue: Queue, model_name: str, model_type: str,
                     X: np.ndarray, y: np.ndarray, params: Dict[str, Any]) -> None:
        """Рабочая функция для процесса обучения"""
        try:
            training_time = self.model_manager.train_model(
                model_name, model_type, X, y, params
            )
            queue.put({
                'status': 'success',
                'model_name': model_name,
                'training_time': training_time
            })
        except Exception as e:
            queue.put({
                'status': 'error',
                'model_name': model_name,
                'error': str(e)
            })
    
    def start_training(self, model_name: str, model_type: str,
                      X: np.ndarray, y: np.ndarray, params: Dict[str, Any]) -> bool:
        """Запуск процесса обучения"""
        if len(self.active_processes) >= self.max_processes:
            return False
        
        if self.model_manager.model_exists(model_name):
            return False
        
        queue = Queue()
        
        process = Process(
            target=self._train_worker,
            args=(queue, model_name, model_type, X, y, params)
        )
        
        process.start()
        self.active_processes[model_name] = process
        self.process_queues[model_name] = queue
        
        return True
    
    def check_training_status(self, model_name: str) -> Optional[Dict[str, Any]]:
        """Проверка статуса обучения"""
        if model_name not in self.active_processes:
            return None
        
        process = self.active_processes[model_name]
        queue = self.process_queues[model_name]
        
        if not process.is_alive():
            if not queue.empty():
                result = queue.get()
                self.results[model_name] = result
            
            process.join()
            del self.active_processes[model_name]
            del self.process_queues[model_name]
            
            if model_name in self.results:
                return self.results[model_name]
        
        return {'status': 'training', 'model_name': model_name}
    
    def get_active_processes_count(self) -> int:
        """Количество активных процессов"""
        return len(self.active_processes)
    
    def can_start_training(self) -> bool:
        """Можно ли запустить новый процесс"""
        return len(self.active_processes) < self.max_processes
    
    def cleanup_finished_processes(self):
        """Очистка завершенных процессов"""
        finished = []
        for model_name, process in self.active_processes.items():
            if not process.is_alive():
                finished.append(model_name)
        
        for model_name in finished:
            process = self.active_processes[model_name]
            process.join()
            del self.active_processes[model_name]
            del self.process_queues[model_name]

## app/main.py

* Создает FastAPI приложение с основными эндпоинтами для работы с ML-моделями
* Реализует REST API для: обучения моделей (/fit), предсказаний (/predict), загрузки/выгрузки моделей, удаления моделей
* Обрабатывает ошибки и граничные случаи (ограничение по CPU, дублирование имен, несуществующие модели)
* Интегрирует ModelManager и ProcessManager для управления моделями и процессами
* Предоставляет статус сервера и список доступных моделей

In [None]:
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import numpy as np
from typing import List, Dict, Any
import time
import uvicorn

from app.config import settings
from app.schemas import (
    FitRequest, PredictRequest, ModelConfig,
    FitResponse, PredictResponse, StatusResponse
)
from app.model_manager import ModelManager
from app.process_manager import ProcessManager

app = FastAPI(
    title="ML Model Training Server",
    description="Веб-сервер для обучения и использования ML моделей",
    version="1.0.0"
)

model_manager = ModelManager(settings.models_dir)
process_manager = ProcessManager(
    max_processes=settings.cpu_cores - 1,  
    models_dir=settings.models_dir
)

@app.get("/")
async def root():
    """Корневой эндпоинт"""
    return {
        "message": "ML Model Training Server",
        "status": "running",
        "config": {
            "models_dir": settings.models_dir,
            "cpu_cores": settings.cpu_cores,
            "max_loaded_models": settings.max_loaded_models
        }
    }

@app.get("/status")
async def get_status():
    """Получение статуса сервера"""
    return {
        "active_training_processes": process_manager.get_active_processes_count(),
        "loaded_models": model_manager.get_loaded_models_count(),
        "max_loaded_models": settings.max_loaded_models,
        "can_start_training": process_manager.can_start_training()
    }

@app.post("/fit", response_model=FitResponse)
async def fit_model(request: FitRequest):
    """Обучение модели"""
    if not process_manager.can_start_training():
        raise HTTPException(
            status_code=429,
            detail="Нет свободных ядер для обучения. Максимум процессов: "
                   f"{settings.cpu_cores - 1}"
        )
    
    if model_manager.model_exists(request.model_name):
        raise HTTPException(
            status_code=400,
            detail=f"Модель с именем '{request.model_name}' уже существует"
        )
    
    if request.model_type not in ['logreg', 'rf', 'svm']:
        raise HTTPException(
            status_code=400,
            detail=f"Неподдерживаемый тип модели. Доступные: logreg, rf, svm"
        )
    
    try:
        X = np.array(request.features)
        y = np.array(request.labels)
    except Exception as e:
        raise HTTPException(
            status_code=400,
            detail=f"Ошибка преобразования данных: {str(e)}"
        )
    
    success = process_manager.start_training(
        request.model_name,
        request.model_type,
        X, y,
        request.params
    )
    
    if not success:
        raise HTTPException(
            status_code=500,
            detail="Не удалось запустить процесс обучения"
        )
    
    return FitResponse(
        status="started",
        model_name=request.model_name,
        message=f"Обучение модели '{request.model_name}' запущено"
    )

@app.get("/fit/{model_name}/status")
async def get_fit_status(model_name: str):
    """Проверка статуса обучения модели"""
    status = process_manager.check_training_status(model_name)
    
    if status is None:
        if model_manager.model_exists(model_name):
            return {
                "status": "completed",
                "model_name": model_name,
                "message": "Модель уже обучена"
            }
        else:
            raise HTTPException(
                status_code=404,
                detail=f"Обучение модели '{model_name}' не найдено"
            )
    
    if status['status'] == 'error':
        raise HTTPException(
            status_code=500,
            detail=f"Ошибка при обучении: {status.get('error', 'Unknown error')}"
        )
    
    return status

@app.post("/predict", response_model=PredictResponse)
async def predict(request: PredictRequest):
    """Предсказание с помощью обученной модели"""
    if request.model_name not in model_manager.loaded_models:
        if not model_manager.load_model(request.model_name):
            raise HTTPException(
                status_code=404,
                detail=f"Модель '{request.model_name}' не найдена"
            )
    
    if model_manager.get_loaded_models_count() > settings.max_loaded_models:
        raise HTTPException(
            status_code=429,
            detail=f"Превышен лимит загруженных моделей: {settings.max_loaded_models}"
        )
    
    try:
        X = np.array(request.features)
    except Exception as e:
        raise HTTPException(
            status_code=400,
            detail=f"Ошибка преобразования данных: {str(e)}"
        )
    
    try:
        predictions, probabilities = model_manager.predict(request.model_name, X)
        
        prob_list = None
        if probabilities is not None:
            prob_list = probabilities.tolist()
        
        return PredictResponse(
            status="success",
            predictions=predictions.tolist(),
            probabilities=prob_list
        )
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Ошибка при предсказании: {str(e)}"
        )

@app.post("/load", response_model=StatusResponse)
async def load_model(config: ModelConfig):
    """Загрузка модели в память"""
    if not model_manager.model_exists(config.model_name):
        raise HTTPException(
            status_code=404,
            detail=f"Модель '{config.model_name}' не найдена"
        )
    
    if model_manager.get_loaded_models_count() >= settings.max_loaded_models:
        raise HTTPException(
            status_code=429,
            detail=f"Превышен лимит загруженных моделей: {settings.max_loaded_models}"
        )
    
    success = model_manager.load_model(config.model_name)
    
    if not success:
        raise HTTPException(
            status_code=500,
            detail=f"Не удалось загрузить модель '{config.model_name}'"
        )
    
    return StatusResponse(
        status="success",
        message=f"Модель '{config.model_name}' загружена в память"
    )

@app.post("/unload", response_model=StatusResponse)
async def unload_model(config: ModelConfig):
    """Выгрузка модели из памяти"""
    success = model_manager.unload_model(config.model_name)
    
    if not success:
        raise HTTPException(
            status_code=404,
            detail=f"Модель '{config.model_name}' не загружена в память"
        )
    
    return StatusResponse(
        status="success",
        message=f"Модель '{config.model_name}' выгружена из памяти"
    )

@app.delete("/remove", response_model=StatusResponse)
async def remove_model(config: ModelConfig):
    """Удаление модели с диска"""
    model_manager.unload_model(config.model_name)
    
    success = model_manager.delete_model(config.model_name)
    
    if not success:
        raise HTTPException(
            status_code=404,
            detail=f"Модель '{config.model_name}' не найдена на диске"
        )
    
    return StatusResponse(
        status="success",
        message=f"Модель '{config.model_name}' удалена с диска"
    )

@app.delete("/remove_all", response_model=StatusResponse)
async def remove_all_models():
    """Удаление всех моделей"""
    count = model_manager.delete_all_models()
    
    return StatusResponse(
        status="success",
        message=f"Удалено {count} моделей",
        details={"deleted_count": count}
    )

@app.get("/models")
async def list_models():
    """Список всех моделей"""
    import os
    models = []
    
    for filename in os.listdir(settings.models_dir):
        if filename.endswith('.pkl'):
            model_name = filename[:-4]  # Убираем .pkl
            models.append({
                "name": model_name,
                "loaded": model_name in model_manager.loaded_models
            })
    
    return {
        "models": models,
        "count": len(models)
    }

@app.on_event("startup")
async def startup_event():
    """Действия при запуске сервера"""
    import os
    os.makedirs(settings.models_dir, exist_ok=True)

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        reload=True
    )

## requirements.txt

* Содержит список всех Python-зависимостей проекта с указанием версий
* FastAPI и Uvicorn для веб-сервера
* Pydantic для валидации данных
* Scikit-learn для ML-моделей
* NumPy и Pandas для работы с данными
* Requests и Aiohttp для клиентской части

In [None]:
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
scikit-learn==1.3.2
numpy==1.24.3
pandas==2.1.4
requests==2.31.0
aiohttp==3.9.1
python-multipart==0.0.6
python-dotenv==1.0.0

## Dockerfile 

* Определяет Docker-образ на основе Python 3.9 slim
* Устанавливает зависимости из requirements.txt
* Копирует исходный код приложения
* Создает директорию для сохранения моделей
* Открывает порт 8000 для доступа к API
* Запускает сервер Uvicorn при старте контейнера

In [None]:
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

RUN mkdir -p saved_models

EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

## docker-compose.yml

* Определяет сервис ml-server для запуска приложения
* Пробрасывает порт 8000 из контейнера на хост
* Монтирует директории для сохранения моделей и данных
* Загружает переменные окружения из файла .env
* Устанавливает параметры по умолчанию
* Настраивает автоматический перезапуск при падении

In [None]:
services:
  ml-server:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./saved_models:/app/saved_models
      - ./data:/app/data
    env_file:
      - .env
    environment:
      - MODELS_DIR=/app/saved_models
      - CPU_CORES=4
      - MAX_LOADED_MODELS=3
    restart: unless-stopped