In [1]:
import requests
import time
import asyncio
import aiohttp
import time
from sklearn.datasets import fetch_20newsgroups
import numpy as np

# Клиентская часть

### Определим функции клиентской части. Они не будут поддерживать асинхронные запросы. Для демонстрации асинхронности будем использовать .py скрипты.

`PORT = 5123`

In [2]:
def fit(model_type, model_name, x, y, params={}):
    try:
        response = requests.post(
            "http://localhost:5123/fit",
            json={
                "model_type": model_type,
                "params": params,
                "model_name": model_name,
                "x": x,
                "y": y,
            },
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def predict(model_name, x):
    try:
        response = requests.post(
            "http://localhost:5123/predict",
            json={"x": x, "model_name": model_name},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["prediction"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def load(model_name):
    try:
        response = requests.post(
            "http://localhost:PORT/load",
            json={"model_name": model_name},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def unload(model_name):
    try:
        response = requests.post(
            "http://localhost:PORT/unload",
            json={"model_name": model_name},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def remove(model_name):
    try:
        response = requests.post(
            "http://localhost:PORT/remove",
            json={"model_name": model_name},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def remove_all():
    try:
        response = requests.post(
            "http://localhost:PORT/remove_all",
            json={},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success:", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def list_loaded_models():
    try:
        response = requests.post(
            "http://localhost:5123/list_loaded_models",
            json={},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success, loaded models: ", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def list_saved_models():
    try:
        response = requests.post(
            "http://localhost:5123/list_saved_models",
            json={},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success, saved models: ", data["message"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))


def processes():
    try:
        response = requests.post(
            "http://localhost:5123/processes",
            json={},
        )
        response.raise_for_status()  # Raise HTTP Error for bad responses
        data = response.json()
        print("Success: n_proc =", data["n_proc"])

    except requests.exceptions.HTTPError as e:
        error_detail = response.json().get("detail", "Unknown error")
        print("Server returned error:", error_detail)

    except Exception as e:
        print("Request failed:", str(e))

Посмотрим что будет если обратиться к выключенному серверу

In [21]:
list_loaded_models()

Request failed: HTTPConnectionPool(host='localhost', port=5123): Max retries exceeded with url: /list_loaded_models (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7d3e1887f440>: Failed to establish a new connection: [Errno 111] Connection refused'))


Получили HTTP - ошибку. Все исключения в серверной части ловятся и выбрасываются как HTTP ошибка

Запустим сервер

In [23]:
list_loaded_models()

Success, loaded models:  []


### Обучим модели

Для начала обзаведёмся данными. Для демонстрации работы нам будет достаточно простейших данных

In [68]:
x = np.array([[1, 2], [3, 4]])
y = np.array([0, 1]) 

Теперь обучим все 3 вида моделей. Модели реализованы в модуле `classifier` поэтому изменения классификатора можно проводить не затрагивая код серверной части.

In [69]:
fit("LinearRegression", "LinReg_test", x.tolist(), y.tolist())
fit("RandomForest", "RF_test", x.tolist(), y.tolist())
fit("CatBoost", "CatBoost_test", x.tolist(), y.tolist())

Success: LinReg_test fitted successfully
Success: RF_test fitted successfully
Success: CatBoost_test fitted successfully


Посмотрим что все модели действительно сохранились

In [70]:
list_saved_models()

Success, saved models:  ['RF_test.pkl', 'CatBoost_test.pkl', 'LinReg_test.pkl']


Все 3 модели сохранились. Обучим ещё 2 регрессии с тем же именем и посмотрим что будет

In [71]:
fit("LinearRegression", "LinReg_test", x.tolist(), y.tolist())
fit("LinearRegression", "LinReg_test", x.tolist(), y.tolist())

Success: LinReg_test fitted successfully
Success: LinReg_test fitted successfully


In [72]:
list_saved_models()

Success, saved models:  ['LinReg_test_1.pkl', 'RF_test.pkl', 'CatBoost_test.pkl', 'LinReg_test_2.pkl', 'LinReg_test.pkl']


Сервер правильно обработал граничный случай одинаковых имён

### Манипуляции с сохранёнными моделями

Загрузим обученнуе ранее модели линейной регрессии и случайного леса

In [73]:
load("LinReg_test")
load("RF_test")

Success: Model 'LinReg_test' loaded successfully
Success: Model 'RF_test' loaded successfully


Посмотрим какие модели загружены

In [76]:
list_loaded_models()

Success, loaded models:  ['RF_test', 'LinReg_test']


Сделаем предсказания каждой моделью

In [78]:
predict("RF_test", x.tolist())
predict("LinReg_test", x.tolist())

Success: [0, 1]
Success: [1.1102230246251565e-16, 1.0]


Выгрузим все модели

In [79]:
unload("LinReg_test")
unload("RF_test")

Success: Model 'LinReg_test' has been unloaded
Success: Model 'RF_test' has been unloaded


Посмотрим и убедимся что на данный момент нет загруженных моделей

In [80]:
list_loaded_models()

Success, loaded models:  []


Удалим модель

In [81]:
remove("LinReg_test_1")

Success: Model file 'LinReg_test_1' removed successfully


In [82]:
list_saved_models()

Success, saved models:  ['RF_test.pkl', 'CatBoost_test.pkl', 'LinReg_test_2.pkl', 'LinReg_test.pkl']


Попробуем удалить эту модель ещё раз (должны получить ошибку)

In [83]:
remove("LinReg_test_1")

Server returned error: [Errno 2] No such file or directory: '/home/danilach/python_ML_server/models/LinReg_test_1.pkl'


Удалим все модели

In [84]:
remove_all()
list_saved_models()

Success: folder /home/danilach/python_ML_server/models is cleared successfully
Success, saved models:  []


### Параллельное обучение

Теперь посмотрим насколько быстрее обучать модели параллельно. Слегка изменим код с лекции чтобы он можно было менять число запускаемых процессов

In [None]:
import aiohttp
import asyncio
import time
import argparse

model_path = "test-model"

# Example data matching your server's expected schema
fit_payload = {
    "x": [[1, 2], [3, 4]],  # Replace with actual training data
    "y": [0, 1],  # Replace with actual labels
    "model_name": model_path,
    "model_type": "LinearRegression",
    "params": {},
}


async def post(session, payload):
    async with session.post(
        url="http://localhost:5123/fit",
        json=payload,
    ) as response:
        if response.status == 200:
            data = await response.json()
            print(data)
        else:
            print(f"Request failed with status: {response.status}")


async def main(num_requests):
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[post(session, fit_payload) for _ in range(num_requests)])


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="AsyncFit")
    parser.add_argument(
        "--num_requests",
        type=int,
        required=True,
    )
    args = parser.parse_args()

    ts = time.time()
    try:
        asyncio.run(main(args.num_requests))
    except Exception as e:
        print(f"Exception occurred: {e}")


Запустим обучение 2 моделей.

In [9]:
%%time
!python3 run_last_script.py --num_requests 2

test-model1 fitted successfully
test-model0 fitted successfully
CPU times: user 1.48 s, sys: 220 ms, total: 1.7 s
Wall time: 1min 46s


Теперь сделаем тоже самое последовательно

In [10]:
%%time
!python3 run_last_script.py --num_requests 1
!python3 run_last_script.py --num_requests 1

test-model0 fitted successfully
test-model0 fitted successfully
CPU times: user 2.55 s, sys: 389 ms, total: 2.94 s
Wall time: 3min 31s


Видим что, как и ожидали, параллельное обучение двух моделей произошло в два раза быстрее

In [13]:
remove_all()

Success: folder /home/danilach/python_ML_server/models is cleared successfully


Попробуем запустить обучение на 7 процессах (в нашем случае `n_jobs` = 5, т.е. максимум 4 процесса обучения)

In [7]:
%%time
!python3 run_last_script.py --num_requests 7

Request failed with status: 400
Detail: Cannot create another process, n_jobs = 5 are already running
Request failed with status: 400
Detail: Cannot create another process, n_jobs = 5 are already running
Request failed with status: 400
Detail: Cannot create another process, n_jobs = 5 are already running
test-model2 fitted successfully
test-model6 fitted successfully
test-model3 fitted successfully
test-model4 fitted successfully
CPU times: user 210 ms, sys: 50.1 ms, total: 261 ms
Wall time: 11.6 s


Модели обучились, посмотрим сколько активных процессов в данный момент на сервере 

1 процесс - основной процесс самого сервера. Всё работает как и задумано 

### Асинхронный вызов нескольких предсказаний

Код `async_predict.py`

In [None]:
import aiohttp
import asyncio
import time
import argparse
from sklearn.datasets import make_regression

X, y = make_regression(n_samples=2000, n_features=100, noise=0.1)


model_path = "test-model"

# Example data matching your server's expected schema


def fit_payload(i):
    return {
        "x": X.tolist(),  # Replace with actual training data
        "model_name": model_path + f"{i}",
    }


async def post(session, payload):
    try:
        async with session.post(
            url="http://localhost:5123/predict",
            json=payload,
        ) as response:
            if response.status == 200:
                data = await response.json()
                print(data)
            else:
                try:
                    error_data = await response.json()
                    print(f"Request failed with status: {response.status}")
                    print(f"Detail: {error_data.get('detail', 'No detail provided')}")
                except Exception:
                    print(
                        f"Request failed with status: {response.status}, but no JSON detail returned."
                    )
    except aiohttp.ClientError as e:
        print(f"Request failed due to client error: {e}")


async def main(num_requests):
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(
            *[post(session, fit_payload(i)) for i in range(num_requests)]
        )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="AsyncFit")
    parser.add_argument(
        "--num_requests",
        type=int,
        required=True,
    )
    args = parser.parse_args()

    ts = time.time()
    try:
        asyncio.run(main(args.num_requests))
    except Exception as e:
        print(f"Exception occurred: {e}")

Загрузим и вызовем предикт

In [17]:
for i in range(4):
    load(f"test-model{i}")

Success: Model 'test-model0' loaded successfully
Success: Model 'test-model1' loaded successfully
Success: Model 'test-model2' loaded successfully
Success: Model 'test-model3' loaded successfully


In [19]:
!python3 async_predict.py --num_requests 4

{'prediction': [80.14116992660202, -141.87147402883184, 17.53571764764044, -55.07272317292556, 4.178167926712026, -48.66051455667085, -76.9273958314548, 138.6290278915488, 34.29854069258988, -46.724091276296704, -76.4433003908822, -4.92895982402721, -9.03437945883784, -17.24253860122012, -16.740328123124275, 66.1755860434453, 11.223093119835571, -16.401128731929045, 18.643566677946488, -204.23571789189992, 132.15658469830296, -16.801801932838455, -33.242764764977025, 45.309418722090356, 102.92531385091284, -74.78497457841178, 35.50191210710427, -6.338997733120814, 1.7290496836785239, -126.09547590795108, 5.183219237927839, -35.98015452263401, -175.2322367516432, 119.94071294813804, -99.94346718825219, -69.02460595542476, 156.3516569441584, -40.14127080794025, -120.45472640695886, -60.84496499865907, 88.66774330100974, -93.28592867148913, 12.947028718122642, 70.916419055853, 23.729652401052455, -103.17546187716374, -188.79875086462744, -61.06945159914848, -15.235445573704277, -122.48723

Предсказания одинаковые так как мы обучили одинаковые модели на одних и тех же данных, и запустили predict на опять на одинаковых данных