<a href="https://colab.research.google.com/github/CarlosfcPinheiro/pibic-api-llm-integration/blob/main/pibic_queue_rabbitmq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Teste de requisições para LLMs utilizando padrão de Filas assíncronas
O objetivo principal é testar e avaliar o tempo de resposta de requisições para uma API que interaja com LLMs, utilizando o padrão de filas assíncronas.

## Código e estrutura
Para análise, são utilizados **8 prompts diferentes**, que buscam explorar diferentes tamanhos e temáticas de texto. Nesse script de teste, cada cliente/producer é responsável por enviar um prompt (uma request) para a fila de requests `request_summarize_queue`, sendo **30 clientes/producers** para cada prompt, onde são armazenado os tempos de resposta de todas as requests para cada prompt, respectivamente.

O arquivo .csv que contém os dados de tempo da request é lido posteriormente para compor os dados de análise, que também são escritos em um outro csv.

Além disso, cada conjunto de testes são executados utilizando **4 modelos diferentes**, que fundamentalmente são especializados em sumarização, mas com objetivos diferentes. Os modelos utilizados são:

- facebook/bart_large_cnn
- google/pegasus_cnn_dailymail
- knkarthick/MEETING_SUMMARY
- google/pegasus_xsum

O worker/consumer é o responsável por receber as mensagens (corpo das requests) da fila `request_summarize_queue`, processar, realizar a request para a API gateway de LLM, e devolver a resposta para o cliente, publicando a resposta em sua fila de respostas exclusiva.

## Padrões e bibliotecas
O padrão a ser utilizado é o de **Filas assíncronas**, dado que utiliza uma fila de mensagens (request_summarize_queue) com diferentes clientes/producers atuando em **threads únicas**.

As principais bibliotecas utilizadas são:
- time (funções de manipulação de tempo)
- requests (envio de requisições HTTP)
- csv (manipulação de arquivos csv)
- statistics (funções para cálculos estatísticos)
- numpy (funções para manipulação de arrays e matrizes)
- pika (SDK python do RabbitMQ)
- threading (gerenciamento de threads)

## Métricas analisadas
As métricas utilizadas para análise do tempo de resposta da requisição para cada prompt foram as seguintes:
- Média de tempo (tempo médio de resposta das requisições)
- Máximo e Mínimo (tempo com valores máximos e mínimos)
- Desvio padrão (dispersão absoluta dos dados em relação à média, numericamente)
- Coeficiente de variação (dispersão relativa dos dados em relação à média, em  porcentagem)
- Percentil - 95% e 99% (valores abaixo do qual uma certa porcentagem dos tempos se encontram)

> Todas as métricas levam em consideração a unidade de segundos para o tempo de resposta da requisição

In [2]:
!apt-get update
!apt-get install rabbitmq-server -y
!service rabbitmq-server start

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cli.github.com/packages stable InRelease [3,917 B]
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [6,205 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [3,633 kB]
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,860 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [9,572 kB]
Get:13 http://archive.ubuntu.com/ubuntu jammy-backports InRel

In [3]:
!sudo rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...


In [4]:
pip install pika --upgrade

Collecting pika
  Downloading pika-1.3.2-py3-none-any.whl.metadata (13 kB)
Downloading pika-1.3.2-py3-none-any.whl (155 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/155.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m153.6/155.4 kB[0m [31m6.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m155.4/155.4 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pika
Successfully installed pika-1.3.2


In [5]:
# Definição dos prompts de teste ==========================
PROMPTS = [
    """Solar energy is one of the most promising renewable sources for the planet’s future. With concerns about climate change and the need to reduce fossil fuel dependence, many countries are investing in clean technologies. Brazil has one of the largest solar potentials, especially in the Northeast and Midwest, where radiation is high year-round. The installation of solar panels in homes and businesses has grown, driven by incentives and lower equipment costs. Distributed generation allows consumers to also produce energy, helping decentralize the electricity system. Challenges remain, such as infrastructure, storage, and stronger public policies, but with technology and social engagement, solar energy is set to play an increasingly important role in Brazil’s energy matrix.""",

    """The city of Ouro Preto, located in the state of Minas Gerais, is one of the greatest symbols of Brazilian colonial architecture and history. Founded in the 17th century during the gold rush, it quickly became one of the main economic centers of the Portuguese colony, attracting thousands of people in search of wealth. Its stone streets, steep slopes, and preserved buildings reveal a past marked by opulence, religiosity, and resistance. Ouro Preto is home to some of the most impressive baroque churches in Brazil, such as the Church of Saint Francis of Assisi, designed by Aleijadinho and decorated with works by Mestre Ataíde. The city was also the stage for important events of the Inconfidência Mineira, a movement that sought Brazil’s independence from Portugal. Today, the city is recognized as a UNESCO World Heritage Site, attracting tourists interested in history, art, and culture. In addition to its historical value, Ouro Preto has a vibrant university life thanks to the Federal University of Ouro Preto (UFOP) and hosts various festivals of art, music, and cinema throughout the year. This combination of tradition and youth makes the city a unique place where past and present coexist in harmony.""",

    """Artificial intelligence (AI) is one of the most transformative technologies of the 21st century. It impacts medicine, education, industry, and commerce, improving efficiency and user experience. At the same time, AI raises ethical concerns about data use, bias, and its impact on jobs. Governments and society must ensure its benefits are distributed fairly and responsibly.""",

    """Classical music, although it emerged centuries ago, continues to influence contemporary composers. Works by Beethoven, Mozart, and Bach are studied in conservatories and reinterpreted in different musical styles. This tradition shows how art can transcend generations and reinvent itself.""",

    """The history of the internet began in the 1960s with military projects in the United States, evolving into academic networks and later into commercial use. Today, the internet connects billions of people in real time, transforming the way we work, study, and interact. Digital platforms enable instant communication, e-commerce, and unlimited access to information. However, problems also arise, such as fake news, cybercrime, and technological dependence. The future of the internet depends on balanced regulations, continuous innovation, and digital education so that its benefits can be enjoyed safely and inclusively.""",

    """Coffee is one of the most consumed beverages in the world and is part of Brazilian culture.""",

    """The Olympic Games are more than just a sporting event; they represent a celebration of unity, diversity, and human achievement. Athletes from around the world gather to compete at the highest level, showcasing not only physical skill but also resilience and determination. The Games have evolved over time, incorporating new disciplines and promoting values such as fair play and respect. Beyond the competitions, the Olympics foster cultural exchange, allowing nations to share traditions and build mutual understanding. Despite challenges such as political tensions and financial costs, the spirit of the Olympics continues to inspire millions, proving that sport can be a powerful force for peace and cooperation.""",

    """The Renaissance was a period of profound cultural, artistic, and scientific transformation in Europe between the 14th and 17th centuries. It marked the rediscovery of classical knowledge and the flourishing of human creativity. Artists like Leonardo da Vinci and Michelangelo produced masterpieces that continue to inspire admiration today, while scientists such as Galileo challenged traditional views of the universe. The invention of the printing press by Gutenberg revolutionized the spread of ideas, making books more accessible and fueling intellectual debates."""
]

In [6]:
# Estabelecer conexão com RabbitMQ
def get_connection():
  # retorna 'connection' com o cliente localhost do RabbitMQ
  return pika.BlockingConnection(pika.ConnectionParameters('localhost'))

In [16]:
# Criação de classe utilitária de simulação de cliente/producer ================
import uuid, json

QUEUE_NAME = "request_summarize_queue"

class Client:
  def __init__(self, client_id):
    # atributos de cliente/producer
    self.client_id = client_id
    self.connection = get_connection()
    self.channel = self.connection.channel()
    self.response = None

    # declara uma queue exclusiva e temporária, response queue
    result = self.channel.queue_declare(queue='', exclusive=True)
    self.callback_queue = result.method.queue

    # informando ao RabbitMQ que deve executar essa função ao receber uma nova mensagem
    self.channel.basic_consume(
        queue=self.callback_queue,
        on_message_callback=self.on_response,
        auto_ack=True
    )

  # função callback para receber a resposta da response queue
  def on_response(self, ch, method, props, body) -> None:
    if self.corr_id == props.correlation_id:
      self.response = body

  # função que simula uma requisição RPC
  def call(self, prompt: str) -> str:
    self.response = None
    # gera o id e payload do cliente
    self.corr_id = str(uuid.uuid4())
    payload = {"client_id": self.client_id, "prompt": prompt}
    # publica o payload em formato json na request_summarize_queue
    self.channel.basic_publish(
        exchange='',
        routing_key=QUEUE_NAME,
        properties=pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=self.corr_id
        ),
        body=json.dumps(payload)
    )
    # loop para verificar quando a resposta chega na response queue
    while self.response is None:
      self.connection.process_data_events()
    return self.response

In [24]:
# Função de requisição para o api gateway dos modelos =======================
import time, requests

API_URL = "https://zetta-faunlike-kindheartedly.ngrok-free.dev"
MODEL = "facebook_bart_large_cnn"
ENDPOINT_TEST = f"{API_URL}/summarize?model={MODEL}"

def summarizer(prompt: str, client_id: int):
  try:
    response = requests.post(ENDPOINT_TEST, json={"text": prompt})
    return response.json()["summary"]
  except Exception as e:
    print(f"\033[91m[ERROR]\033[0m Houve um problema com o teste das requisições, prompt/{1} client/{client_id}:\n {e}")

In [14]:
# Declarando Consumer (worker) =======================
import pika, time, json

# função para definição do consumer (worker) da request_summarize_queue
def consumer():
  connection = get_connection()
  channel = connection.channel()
  # verificar se a queue existe de forma idempotente (é uma boa prática, nesse caso, repetir o código de declaração da queue)
  channel.queue_declare(queue=QUEUE_NAME, durable=True)

  # função callback que será chamada quando uma message for recebida
  def callback(ch, method, props, body):
    payload = json.loads(body)
    client_id = payload["client_id"]
    prompt = payload["prompt"]

    print(f"\033[94m[Consumer]\033[0m Processando a request do cliente-{client_id}")
    summary = summarizer(prompt, client_id)

    response = {"client_id": client_id, "summary": summary}
    # publica a resposta na queue individual do cliente
    channel.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=json.dumps(response)
    )
    print(f"\033[94m[Consumer]\033[0m Request enviada do cliente-{client_id} para a queue {QUEUE_NAME}\n")
    ch.basic_ack(delivery_tag=method.delivery_tag)

  channel.basic_qos(prefetch_count=1)
  # informando ao RabbitMQ que essa função callback deve receber messages
  channel.basic_consume(
      queue=QUEUE_NAME,
      auto_ack=False,
      on_message_callback=callback
  )

  print('\033[94m[Consumer]\033[0m Aguardando messages...\n')
  channel.start_consuming()

In [21]:
# Função para salvar tempo das requisições de um determinado prompt em .csv ===========================
import csv, os

FOLDER = "tempos"
CSV_REQ_TIMES = f"./{FOLDER}/tempo_requisicoes_{MODEL}.csv"

def save_times_csv(request_prompt: str, request_response_data: dict) -> None:
  os.makedirs(FOLDER, exist_ok=True)

  with open(CSV_REQ_TIMES, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=[
        "prompt_id",
        "client_id",
        "prompt_entrada",
        "saida_resposta",
        "tamanho_prompt",
        "tamanho_saida",
        "tempo_s"
    ])
    writer.writeheader()
    writer.writerows(request_response_data)

    print(f"\033[93m[SYSTEM]\033[0m Arquivo com os tempos de requisição do modelo {MODEL} salvo com sucesso")

In [23]:
# Execução do consumer ===========
import threading, time, json

# quantidade de clientes/producers
CLIENTS_QUANT = 30
# armazena em formato dict os dados de respsota das requisições de cada cliente/producer
REQUEST_RESPONSE_DATAS = []

# iniciar o consumer em uma thread
threading.Thread(target=consumer).start()
def run_client(client: Client, prompt: str, prompt_id: int, request_response_datas: list) -> None:
  start = time.time()
  response = client.call(prompt)
  end = time.time()
  request_time = round(end - start, 4)

  summary_response = json.loads(response.decode("utf-8"))
  # armazena o tempo de requisição na list de REQUEST_RESPONSE_DATAS para usar posteriormente no salvamento do .csv
  request_response_datas.append({
      "prompt_id": prompt_id,
      "client_id": client.client_id,
      "prompt_entrada": prompt,
      "saida_resposta": summary_response["summary"],
      "tamanho_prompt": len(prompt),
      "tamanho_saida": len(summary_response["summary"]),
      "tempo_s": request_time
  })

  print(f"[Client-{client.client_id}] Recebeu uma resposta: {summary_response["summary"][:40]}... | Tempo: {request_time}")

for idx, prompt in enumerate(PROMPTS):
  print(f"\n\033[93m[SYSTEM]\033[0m Executando testes para o prompt {idx} ==============")
  # simulação de (n) clientes/producers
  clients = [Client(i) for i in range(CLIENTS_QUANT)]
  # inicia uma thread para cada cliente
  threads = []
  for c in clients:
    t = threading.Thread(target=run_client, args=(c, prompt, idx, REQUEST_RESPONSE_DATAS))
    threads.append(t)
    t.start()

  for t in threads:
    t.join()

# salva os tempos de requisição no .csv de tempos
save_times_csv(MODEL, REQUEST_RESPONSE_DATAS)


[94m[Consumer][0m Aguardando messages...

[94m[Consumer][0m Processando a request do cliente-1
[94m[Consumer][0m Processando a request do cliente-0
[Consumer] Processando a request do cliente-4
[94m[Consumer][0m Processando a request do cliente-2
[Consumer] Processando a request do cliente-3
[94m[Consumer][0m Processando a request do cliente-7
[94m[Consumer][0m Processando a request do cliente-6
[Client-0] Recebeu uma resposta: Brazil has one of the largest solar pote... | Tempo: 6.2772
[94m[Consumer][0m Request enviada do cliente-0 para a queue request_summarize_queue

[94m[Consumer][0m Processando a request do cliente-5
[94m[Consumer][0m Request enviada do cliente-2 para a queue request_summarize_queue

[Client-2] Recebeu uma resposta: Brazil has one of the largest solar pote... | Tempo: 8.1857
[94m[Consumer][0m Processando a request do cliente-9
[Client-4] Recebeu uma resposta: Brazil has one of the largest solar pote... | Tempo: 10.1691
[Consumer] Request envia

In [27]:
from google.colab import data_table
import pandas as pd
import numpy as np
import statistics

df = pd.read_csv(f"./tempos/tempo_requisicoes_{MODEL}.csv")
display(df)

Unnamed: 0,prompt_id,client_id,prompt_entrada,saida_resposta,tamanho_prompt,tamanho_saida,tempo_s
0,0,0,Solar energy is one of the most promising rene...,Brazil has one of the largest solar potentials...,779,405,6.2772
1,0,2,Solar energy is one of the most promising rene...,Brazil has one of the largest solar potentials...,779,405,8.1857
2,0,4,Solar energy is one of the most promising rene...,Brazil has one of the largest solar potentials...,779,405,10.1691
3,0,1,Solar energy is one of the most promising rene...,Brazil has one of the largest solar potentials...,779,405,11.4698
4,0,3,Solar energy is one of the most promising rene...,Brazil has one of the largest solar potentials...,779,405,13.5046
...,...,...,...,...,...,...,...
235,7,27,The Renaissance was a period of profound cultu...,The Renaissance was a period of profound cultu...,567,372,42.9339
236,7,25,The Renaissance was a period of profound cultu...,The Renaissance was a period of profound cultu...,567,372,44.6028
237,7,28,The Renaissance was a period of profound cultu...,The Renaissance was a period of profound cultu...,567,372,46.1585
238,7,26,The Renaissance was a period of profound cultu...,The Renaissance was a period of profound cultu...,567,372,47.5656


In [None]:
!sudo rabbitmqctl stop_app
!sudo rabbitmqctl reset
!sudo rabbitmqctl start_app

ERROR:pika.adapters.blocking_connection:Unexpected connection close detected: ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
ERROR:pika.adapters.blocking_connection:Unexpected connection close detected: ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
Exception in thread Thread-45 (consumer):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
Exception in thread Thread-21 (consumer):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipython-input-279502506.py", line 39, in consumer
  File "/usr/local/lib/python3.12/dist-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
ERROR:pika.adapters.

Stopping rabbit application on node rabbit@1b553ff1b740 ...
Resetting node rabbit@1b553ff1b740 ...
Starting node rabbit@1b553ff1b740 ...


In [None]:
!sudo rabbitmqctl delete_queue summarize_queue