In [1]:
from typing import List
from configs.config import AppConfig, ModelConfig

from infrastructure.models import TransformerTextClassificationModel
from service.recognition import TextClassificationService
from handlers.recognition import PredictionHandler
from handlers.data_models import ResponseSchema


def build_models(model_configs: List[ModelConfig]) -> List[TransformerTextClassificationModel]:
    models = [
            TransformerTextClassificationModel(conf.model, conf.model_path, conf.tokenizer)
            for conf in model_configs
        ]
    return models


config = AppConfig.parse_file("./configs/app_config.yaml")
models = build_models(config.models)

Downloading (…)lve/main/config.json:   0%|          | 0.00/841 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/2.76k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/398 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/9.08M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/877 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/392 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/770 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

  with safe_open(checkpoint_file, framework="pt") as f:
  return self.fget.__get__(instance, owner)()
  storage = cls(wrap_storage=untyped_storage)
  with safe_open(filename, framework="pt", device=device) as f:


Downloading (…)okenizer_config.json:   0%|          | 0.00/451 [00:00<?, ?B/s]

Downloading (…)tencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/735 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/1.14k [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

In [3]:
models[0].__dict__

{'name': 'cardiffnlp',
 'model_path': 'cardiffnlp/twitter-xlm-roberta-base-sentiment',
 'tokenizer': 'cardiffnlp/twitter-xlm-roberta-base-sentiment',
 'device': 0,
 'model': <transformers.pipelines.text_classification.TextClassificationPipeline at 0x7fd225c81d90>}

In [4]:
recognition_service = TextClassificationService(models)

In [6]:
recognition_service.__dict__

{'service_models': [<infrastructure.models.TransformerTextClassificationModel at 0x7fd235c69730>,
  <infrastructure.models.TransformerTextClassificationModel at 0x7fd235c69e50>,
  <infrastructure.models.TransformerTextClassificationModel at 0x7fd225c42cd0>,
  <infrastructure.models.TransformerTextClassificationModel at 0x7fd1e885d310>,
  <infrastructure.models.TransformerTextClassificationModel at 0x7fd1e8a18f70>]}

### Sync way

In [12]:
from time import time
from tqdm.notebook import tqdm

n_calls = 5000
start = time()

test_texts = ["This is how true happiness looks like 👍😜"]*n_calls

res = [model(t) for t in tqdm(test_texts) for model in models]

finish = time()-start

print(f"Time taken: {finish}")
print(f"RPS: {n_calls/finish}")

  0%|          | 0/5000 [00:00<?, ?it/s]

Time taken: 154.93562006950378
RPS: 32.27146861229852


In [10]:
5000/finish

10304.144304559979

### Threading

In [14]:
import threading


start = time()

results = []
threads = []
for i, model in enumerate(models):
    for test_text in tqdm(test_texts):
        t = threading.Thread(target=lambda: results.append(model(test_text)))
        threads.append(t)
        t.start()
    
for t in threads:
    t.join()
    
finish = time()-start
print(f"Time taken: {finish}")
print(f"RPS: {n_calls/finish}")

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

  0%|          | 0/5000 [00:00<?, ?it/s]

Time taken: 166.41771173477173
RPS: 30.04487892471897


### Multiprocessing

In [39]:
from time import time

import torch
import torch.multiprocessing as mp

from tqdm.notebook import tqdm

from typing import List
from configs.config import AppConfig, ModelConfig

from infrastructure.models import TransformerTextClassificationModel


def chunker(seq, size):
    for chunk in (seq[pos:pos + size] for pos in range(0, len(seq), size)):
        yield chunk


def main(models, test_texts):
    with mp.Pool(processes=5) as pool:
        for model in models:
            results = pool.map(model, test_texts)
    return None

def main2(models, test_texts):
    results = []
    pool = mp.Pool(processes=5)
    
    for model in models:
        r = pool.map(model, test_texts)



def build_models(model_configs: List[ModelConfig]) -> List[TransformerTextClassificationModel]:
    models = [
            TransformerTextClassificationModel(conf.model, conf.model_path, conf.tokenizer)
            for conf in model_configs
        ]
    return models

In [13]:
mp.set_start_method("spawn")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

config = AppConfig.parse_file("./configs/app_config.yaml")
models = build_models(config.models)

RuntimeError: context has already been set

In [40]:
n_calls = 5000
test_texts = ["This is how true happiness looks like 👍😜"]*n_calls
        
start = time()

main2(models, test_texts)
finish = time()-start

print(f"Time taken: {finish}")
print(f"RPS: {n_calls/finish}")

Time taken: 83.35999321937561
RPS: 59.9808110209615


In [35]:
import time

import torch
import torch.multiprocessing as mp


def f(q):
    y = q.get()
    y[0] = 1000


def g(q):
    x = torch.zeros(1).cuda()
    x.share_memory_()
    q.put(x)
    q.put(x)
    while True:
        time.sleep(1)  # this process must live as long as x is in use

In [37]:
# mp.set_start_method('spawn')
queue = mp.Queue()
pf = mp.Process(target=f, args=(queue,), daemon=True)
pf.start()
pg = mp.Process(target=g, args=(queue,), daemon=True)
pg.start()
pf.join()
x = queue.get()
print("x =", x.item())  # Prints x = 1000.0

Process Process-50:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_31511/1644485645.py", line 13, in g
    x = torch.zeros(1).cuda()
  File "/usr/local/lib/python3.8/dist-packages/torch/cuda/__init__.py", line 235, in _lazy_init
    raise RuntimeError(
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
Process Process-49:
Traceback (most recent call last):
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()


KeyboardInterrupt: 

  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_31511/1644485645.py", line 8, in f
    y = q.get()
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
