# Concorrência Baseada em Filas

A concorrência baseada em filas refere-se ao uso de sistemas de filas para gerenciar e processar tarefas em paralelo. Nesse modelo, as tarefas são adicionadas a uma fila e são processadas por workers conforme eles ficam disponíveis. RabbitMQ, Kafka e SQS da AWS são exemplos de sistemas de filas, mas para simplificar, vamos usar a biblioteca `queue` do Python padrão juntamente com `threading` para simular um sistema de filas em um Jupyter Notebook.

No contexto de MLOps, a concorrência baseada em filas pode ser usada para treinar modelos, executar inferências, processar dados e muito mais, tudo de forma assíncrona e escalável.

### Exemplo: Treinamento de Modelos Usando uma Fila

Vamos criar um exemplo onde temos uma fila de conjuntos de dados e vários workers que retiram dados da fila para treinar modelos de regressão linear.

No exemplo a seguir:

- A função `generate_data` cria conjuntos de dados aleatórios (sintéticos).
- A função `worker` representa um worker que retira dados da fila e treina um modelo.
- Criamos uma fila usando `queue.Queue()` e adicionamos vários conjuntos de dados.
- Iniciamos vários workers usando threading. Cada worker retira um conjunto de dados da fila, treina um modelo e, em seguida, retira o próximo conjunto de dados.
- Finalmente, enviamos um sinal (um valor `None`) para cada worker parar após processar todos os dados.

Este é um exemplo básico para demonstrar o conceito de concorrência baseada em filas. Em cenários reais de MLOps, você pode usar sistemas de filas mais robustos e escaláveis, como RabbitMQ ou Kafka, para gerenciar tarefas de forma assíncrona em clusters distribuídos.

# Concorrência Baseada em Filas

In [None]:
import numpy as np
from sklearn.linear_model import LinearRegression
import threading
import queue
import time

# Função para gerar dados aleatórios
def generate_data(n_samples=100):
    X = np.random.rand(n_samples, 1) * 10
    y = 2.5 * X + np.random.randn(n_samples, 1) * 2
    return X, y

# Função worker para treinar modelos
def worker(q, worker_id):
    while True:
        data = q.get()
        if data is None:  # Um sinal para o worker parar
            break
        X, y = data
        model = LinearRegression().fit(X, y)
        print(f"\nWorker-{worker_id} trained a model with coefficient: {model.coef_[0][0]}")
        time.sleep(2)  # Simulando algum tempo de processamento

# Criando uma fila e adicionando dados
data_queue = queue.Queue()
for _ in range(10):
    data_queue.put(generate_data())

# Iniciando workers
num_workers = 3
threads = []
for i in range(num_workers):
    t = threading.Thread(target=worker, args=(data_queue, i))
    t.start()
    threads.append(t)

# Aguardando os workers processarem todos os dados
for _ in range(num_workers):
    data_queue.put(None)  # Sinalizando para os workers pararem
for t in threads:
    t.join()

print("Todos os modelos foram treinados!")
