In [7]:
import numpy as np

import asyncio
import aiohttp
import nest_asyncio
import requests as r
import time

from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer

In [2]:
X = np.random.randint(0, 100, size=(200000, 200))
y = np.random.randint(0, 100, size=(200000, 1))

X = list(map(lambda i:  str(i), X.tolist()))
y = list(map(lambda i: str(i), y.ravel().tolist()))

In [3]:
URL = 'http://0.0.0.0:8000'

In [4]:
body1 = {
    "x": {"texts":X},
    "y": {"labels":y},
    "config":{
        "model_path": "lr_test1",
        "model_type": "lr",
        "feature_type": "cnt",
        'params': {'max_iter': 1000}
    }
}

body2 = {
    "x": {"texts":X},
    "y": {"labels":y},
    "config":{
        "model_path": "lr_test2",
        "model_type": "lr",
        "feature_type": "cnt",
        'params': {'max_iter': 1000}
    }
}

# Синхронное выполнение

In [7]:
start_time = time.time()

model1 = LogisticRegression(max_iter=1000)
data = CountVectorizer().fit_transform(X)
model1.fit(data, y)

model2 = LogisticRegression(max_iter=1000)
data = CountVectorizer().fit_transform(X)
model2.fit(data, y)

print(f"Время выполнения в синхронном режиме - {time.time() - start_time}")

Время выполнения в синхронном режиме - 97.74227666854858


## Асинхронное выполнение

In [22]:
nest_asyncio.apply()

start_time = 0
async def async_requests():
    global start_time
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        await session.post(url=f"{URL}/remove", json={'model_path': 'lr_test1'})
        await session.post(url=f"{URL}/remove", json={'model_path': 'lr_test2'})

        req_1 = await session.post(url=f"{URL}/fit", json=body1, chunked=True)
        req_2 = await session.post(url=f"{URL}/fit", json=body2, chunked=True)
        assert req_1.ok and req_2.ok

loop = asyncio.get_event_loop()
task = loop.create_task(async_requests())
loop.run_until_complete(task)

while True:
    time.sleep(1)
    res1 = r.get(url=f"{URL}/status/lr_test1").json()
    res2 = r.get(url=f"{URL}/status/lr_test2").json()
    if res1['saved'] and res2['saved']:
        break

print(f"Время выполнения в асинхронном режиме  - {time.time() - start_time}")

Время выполнения в асинхронном режиме  - 68.30413961410522


### Предельное количество процессов

In [24]:
body = {
    "x": {"texts":X},
    "y": {"labels":y},
    "config":{
        "model_path": "test",
        "model_type": "lr",
        "feature_type": "cnt",
        'params': {'max_iter': 1000}
    }
}

body1 = {
    "x": {"texts":X},
    "y": {"labels":y},
    "config":{
        "model_path": "test1",
        "model_type": "lr",
        "feature_type": "cnt",
        'params': {'max_iter': 1000}
    }
}

body2 = {
    "x": {"texts":X},
    "y": {"labels":y},
    "config":{
        "model_path": "test2",
        "model_type": "lr",
        "feature_type": "cnt",
        'params': {'max_iter': 1000}
    }
}

In [25]:
nest_asyncio.apply()

start_time = 0
async def async_requests():
    global start_time
    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        await session.post(url=f"{URL}/fit", json=body, chunked=True)
        await session.post(url=f"{URL}/fit", json=body1, chunked=True)
        await session.post(url=f"{URL}/fit", json=body2, chunked=True)
        await session.post(url=f"{URL}/fit", json=body2, chunked=True)
        res = await session.post(url=f"{URL}/fit", json=body2, chunked=True)
        print(await res.json())


loop = asyncio.get_event_loop()
task = loop.create_task(async_requests())
loop.run_until_complete(task)

{'success': False, 'message': 'No free process', 'traceback': None}


In [17]:
pr_body1 = {
    "x": {"texts":X},
    "config":{
        "model_path": "lr_test1",
    }
}

pr_body2 = {
    "x": {"texts":X},
    "config":{
        "model_path": "lr_test2"
    }
}

### Загрузим модели в режим инференса

In [28]:
r.post(url=f"{URL}/load", json={'model_path': 'lr_test1'})
r.post(url=f"{URL}/load", json={'model_path': 'lr_test2'})

status1 = r.get(url=f"{URL}/status/lr_test1").json()
print(status1)
status2 = r.get(url=f"{URL}/status/lr_test2").json()
print(status2)

{'saved': True, 'inference': True}
{'saved': True, 'inference': True}


### Предельный случай загрузки в инференс

In [29]:
r.post(url=f"{URL}/load", json={'model_path': 'test'})
r.post(url=f"{URL}/load", json={'model_path': 'test1'})
res = r.post(url=f"{URL}/load", json={'model_path': 'test2'})
print(res.json())


{'success': True, 'message': 'Max amount of loaded models', 'traceback': None}


## Запустим предсказания

In [11]:

async def async_predict():
    async with aiohttp.ClientSession() as session1:
        await session1.post(url=f"{URL}/predict", json=pr_body1, chunked=True)
        await session1.post(url=f"{URL}/predict", json=pr_body2, chunked=True)    
        

loop = asyncio.get_event_loop()
task = loop.create_task(async_predict())
loop.run_until_complete(task)

### Пример выгрузки модели из режима инференса

In [12]:
r.post(url=f"{URL}/unload", json={'model_path': 'lr_test1'})
r.post(url=f"{URL}/unload", json={'model_path': 'lr_test2'})
status1 = r.get(url=f"{URL}/status/lr_test1").json()
print(status1)
status2 = r.get(url=f"{URL}/status/lr_test2").json()
print(status2)

{'saved': True, 'inference': False}
{'saved': True, 'inference': False}


### Пример удаления модели

In [13]:
r.post(url=f"{URL}/remove", json={'model_path': 'lr_test1'})
status1 = r.get(url=f"{URL}/status/lr_test1").json()
print(status1)

{'saved': False, 'inference': False}


### Пример удаления всех моделей


In [14]:
r.post(url=f"{URL}/remove_all")
status1 = r.get(url=f"{URL}/status/lr_test2").json()
print(status1)

{'saved': False, 'inference': False}
