Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions auth_service/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions auth_service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name = "auth-service"
version = "0.1.0"
description = "serviço de autenticação para o cluster do LabTech"
authors = ["Danrley Pereira <danrleywillian@gmail.com>"]
readme = "README.md"
package-mode = false

[tool.poetry.dependencies]
python = "^3.12"
python = "^3.11"
flask = "^3.1.0"
flasgger = "^0.9.7.1"
pymongo = "^4.11.3"
Expand Down
21 changes: 21 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ services:
options:
max-size: "10m"
max-file: "3"

rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq-local
restart: unless-stopped
ports:
- "5672:5672" # Porta para o protocolo AMQP
- "15672:15672" # Porta para a interface de gerenciamento
environment:
- RABBITMQ_DEFAULT_USER=user
- RABBITMQ_DEFAULT_PASS=password
volumes:
- rabbitmq-data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 5

volumes:
redis-data:
driver: local
rabbitmq-data:
driver: local
5 changes: 4 additions & 1 deletion internal_apis/.env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
MONGO_URI=
MONGO_DATABASE=
API_KEY_LIST=
SERVER_NAME=
SERVER_NAME=
GLOBAL_BUCKET_SIZE=
GLOBAL_QUEUE_NAME=
GLOBAL_LEAK_RATE=
10 changes: 10 additions & 0 deletions internal_apis/Dockerfile.tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM python:3.13

WORKDIR /app/internal_apis

ENV PYTHONPATH=/app/internal_apis

COPY requirements.txt ./
RUN pip install -r requirements.txt

COPY . .
137 changes: 137 additions & 0 deletions internal_apis/backpressure/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Backpressure API (Leaky Bucket)

## O que é o sistema de Leaky Bucket?

O algoritmo de Leaky Bucket é uma estratégia de controle de fluxo e limitação de requisições (rate limiting). Ele funciona como um balde com um furo: as requisições entram no balde (bucket) e "vazam" em uma taxa constante. Se o balde enche (ultrapassa a capacidade), as requisições excedentes são descartadas ou bloqueadas. Isso garante que o sistema não seja sobrecarregado por picos de tráfego.

Neste projeto, existem dois tipos de buckets:

## 1. Leaky Bucket Global (RabbitMQ)
- **Finalidade:** Limita o número total de requisições aceitas por todos os usuários em um determinado endpoint ou aplicação.
- **Implementação:** Arquivo `leaky_bucket_rabbitmq.py`.
- **Backend:** RabbitMQ (fila com tamanho máximo).
- **Como usar:**
- O uso automático via `@request.before_request` foi removido por questões de segurança. Agora, o controle global deve ser adicionado **manualmente** em cada endpoint desejado, usando o decorator:
```python
from backpressure.leaky_bucket_rabbitmq import LeakyBucketRabbitMQ
@LeakyBucketRabbitMQ.add_to_global_leaky_bucket()
def meu_endpoint():
...
```
- Os parâmetros do bucket global (capacidade, nome da fila, taxa de vazamento) **são definidos via variáveis de ambiente** no arquivo `.env`:
- `GLOBAL_BUCKET_SIZE`: capacidade máxima do bucket global (ex: 125)
- `GLOBAL_QUEUE_NAME`: nome da fila no RabbitMQ (ex: global)
- `GLOBAL_LEAK_RATE`: taxa de vazamento do bucket global (ex: 0.3)
- **Não é necessário passar parâmetros no decorator**. O sistema buscará as configurações automaticamente do `.env`.

---

## 2. Leaky Bucket Individual (Redis)
- **Finalidade:** Limita o número de requisições por usuário (identificado por IP ou outro identificador).
- **Implementação:** Arquivo `individual_leaky_bucket.py`.
- **Backend:** Redis (armazenamento de tokens por IP).
- **Como usar:**
```python
from backpressure.individual_leaky_bucket import LeakyBucket
@LeakyBucket.individual_leaky_bucket(bucketcapacity=5, leakrate=1, keytimeout=60)
def meu_endpoint():
...
```
- `bucketcapacity`: número máximo de requisições permitidas por usuário.
- `leakrate`: intervalo (em segundos) para "vazamento" de cada token.
- `keytimeout`: tempo de expiração do registro no Redis (em segundos).

---

## Como adicionar os buckets aos endpoints
- Para controle individual, adicione o decorator `@LeakyBucket.individual_leaky_bucket(...)` ao endpoint desejado.
- Para controle global, adicione o decorator `@LeakyBucketRabbitMQ.add_to_global_leaky_bucket()` ao endpoint.
- **Ordem recomendada:**
```python
@LeakyBucketRabbitMQ.add_to_global_leaky_bucket()
@LeakyBucket.individual_leaky_bucket(...)
def meu_endpoint():
...
```

## ATENÇÃO CRÍTICA SOBRE ORDEM DOS DECORATORS
> **IMPORTANTE:** Caso você utilize **os dois sistemas de leaky bucket juntos** (global e individual) em um mesmo endpoint, **O DECORATOR DO INDIVIDUAL LEAKY BUCKET DEVE SER SEMPRE O PRIMEIRO** (ou seja, deve estar mais "próximo" da função do endpoint) e o decorator do global leaky bucket deve vir depois.
>
> **Exemplo correto:**
> ```python
> @LeakyBucketRabbitMQ.add_to_global_leaky_bucket()
> @LeakyBucket.individual_leaky_bucket(...)
> def meu_endpoint():
> ...
> ```
>
> **Se a ordem for invertida, podem ocorrer graves vulnerabilidades de rate limiting, permitindo que usuários burlem o controle global!**
>
> **NUNCA inverta essa ordem!**

---

## Configuração do sistema global via .env
- As informações da fila global do leaky bucket são definidas no arquivo `.env` na raiz do projeto:
```env
GLOBAL_BUCKET_SIZE=125
GLOBAL_QUEUE_NAME=global
GLOBAL_LEAK_RATE=0.3
```
- Para alterar a capacidade, nome da fila ou taxa de vazamento, basta editar o `.env` e reiniciar o serviço.
- **Atenção:** O RabbitMQ não permite alterar argumentos de uma fila já existente. Se mudar o `GLOBAL_BUCKET_SIZE` ou outros argumentos, altere também o `GLOBAL_QUEUE_NAME` para evitar conflitos, ou exclua a fila antiga manualmente.

---

# Testes automatizados

## Estrutura dos testes
Os testes automatizados do sistema de backpressure estão localizados em:
- `backpressure/test_global_leaky_bucket.py` (testes do bucket global)
- `backpressure/test_individual_leaky_bucket.py` (testes do bucket individual)

Os testes cobrem cenários de limite, vazamento, bloqueio e funcionamento dos buckets.

## Como rodar os testes

### Pré-requisitos
- Python 3.11+ instalado localmente.
- Instale as dependências do projeto com Poetry:
```sh
poetry install
```
- O serviço do RabbitMQ deve estar rodando em um container Docker (ou localmente) e acessível conforme as configurações do `.env`.
- O Redis **não é necessário**: os testes usam `fakeredis` (mock em memória).

### Rodando testes pelo PyCharm (recomendado)
- Você pode rodar qualquer teste individualmente pelo próprio PyCharm, clicando no ícone de execução (▶️) que aparece ao lado da função de teste ou do nome do arquivo de teste.
- Certifique-se de que o interpretador Python do PyCharm está configurado para usar o ambiente virtual criado pelo Poetry (ou o Python correto).
- O RabbitMQ deve estar rodando normalmente em Docker.

### Rodando os testes com o poetry via terminal
1. Certifique-se de que o RabbitMQ está rodando (exemplo usando Docker Compose):
```sh
docker compose up -d rabbitmq
```
2. Execute os testes (com Poetry):
```sh
poetry run pytest backpressure/
```
3. Para rodar um teste específico:
```sh
poetry run pytest backpressure/test_global_leaky_bucket.py::test_nome_do_teste
```
Substitua pelo nome do arquivo e da função de teste desejada.


### Dicas
- Não é necessário rodar nenhum container de testes, apenas o RabbitMQ.
- Se mudar as configurações do `.env`, reinicie o RabbitMQ e os testes.
- Consulte os logs do sistema para mensagens de erro detalhadas.


## Observações finais
- O sistema de backpressure é fundamental para garantir a resiliência da API.
- Sempre respeite a ordem dos decorators para evitar vulnerabilidades.
- Mantenha o `.env` atualizado conforme a configuração desejada do bucket global.
- Para dúvidas ou problemas, consulte este README ou peça suporte ao time.
73 changes: 73 additions & 0 deletions internal_apis/backpressure/individual_leaky_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from datetime import datetime
from auth_service.controller import AuthenticationController
import redis
import time
from functools import wraps
from flask import Flask, request, jsonify
from auth_service.auth_routes import token_required
import traceback

class LeakyBucket:

@staticmethod
def get_redis():
return redis.Redis.from_url("redis://localhost:6379/0")

@staticmethod

def individual_leaky_bucket(bucketcapacity, leakrate, keytimeout):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
try:
r = LeakyBucket.get_redis()
r.ping()
# print("[DEBUG] Conectado ao Redis com sucesso.")
except Exception as e:
# print("[ERRO] Falha ao conectar ao Redis:", e)
traceback.print_exc()
return jsonify({"status": "erro", "mensagem": "Erro ao conectar ao Redis"}), 500

client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if not client_ip:
return jsonify({"message": "Não foi possível identificar o endereço de IP do cliente."}), 400
key = f"leaky_bucket:{client_ip}"
# print(f"[DEBUG] IP da requisicao: {client_ip}")
now = time.time()


try:
data = r.hmget(name=key, keys=["last_access", "tokens"])
# print(f"[DEBUG] Dados brutos do Redis: {data}")
last_access = float(data[0]) if data[0] else now
tokens = int(data[1]) if data[1] else 0
readable_last_access = datetime.fromtimestamp(last_access).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# print(f"[INFO] Último acesso: {readable_last_access}, Tokens agora: {tokens}")
except Exception as e:
# print("[ERRO] Falha ao obter ou interpretar dados do Redis:", e)
traceback.print_exc()
return jsonify({"status": "erro", "mensagem": "Erro ao acessar dados do Redis"}), 500

time_elapsed = now - last_access
leaked_tokens = int(time_elapsed / leakrate)
tokens = max(0, tokens - leaked_tokens)
if leaked_tokens < 0:
last_access = now

if tokens < bucketcapacity:
tokens += 1
try:
r.hset(name=key, mapping={"tokens": tokens, "last_access": now})
r.expire(key, keytimeout)
return f(*args, **kwargs)
except Exception as e:
traceback.print_exc()
return jsonify({"status": "erro", "mensagem": "Erro ao atualizar Redis"}), 500
else:
# Sinaliza para o before_request do global não processar
request._leaky_bucket_individual_blocked = True
return jsonify({"status": "error", "message": "Request limit exceeded!"}), 429

wrapped._has_individual_leaky_bucket = True
return wrapped
return decorator
81 changes: 81 additions & 0 deletions internal_apis/backpressure/leaky_bucket_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import time
import traceback
from functools import wraps
from flask import request, jsonify
from datetime import datetime

import time
from functools import wraps
from flask import request, jsonify
from backpressure import rabbitmq_utils

class LeakyBucketRabbitMQ:
# Variáveis estáticas para conexão e canal persistentes
_connection = None
_channel = None

@staticmethod
def _get_persistent_connection():
# Cria ou retorna conexão/canal persistente
if LeakyBucketRabbitMQ._connection is None or LeakyBucketRabbitMQ._channel is None or LeakyBucketRabbitMQ._connection.is_closed or LeakyBucketRabbitMQ._channel.is_closed:
LeakyBucketRabbitMQ._connection, LeakyBucketRabbitMQ._channel = rabbitmq_utils.create_rabbitmq_connection()
return LeakyBucketRabbitMQ._connection, LeakyBucketRabbitMQ._channel

@staticmethod
def _declare_queue_safe(channel, queue_name, bucketcapacity):
try:
queue = channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-length': bucketcapacity})
return queue
except Exception as e:
# Trata erro de precondição (fila já existe com argumentos diferentes)
if 'PRECONDITION_FAILED' in str(e):
# Fecha canal/conexão e reabre para garantir canal válido
try:
if hasattr(channel, 'connection') and channel.connection and not channel.connection.is_closed:
channel.close()
channel.connection.close()
except Exception:
pass
# Reabre conexão/canal
from backpressure import rabbitmq_utils
connection, channel = rabbitmq_utils.create_rabbitmq_connection()
queue = channel.queue_declare(queue=queue_name, durable=True, passive=True)
# Atualiza canal persistente
LeakyBucketRabbitMQ._connection = connection
LeakyBucketRabbitMQ._channel = channel
return queue
else:
raise

@staticmethod
def add_to_global_leaky_bucket(bucketcapacity=int(os.getenv('GLOBAL_BUCKET_SIZE')), queue_name=os.getenv('GLOBAL_QUEUE_NAME')):
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):

try:
current_queue = queue_name or rabbitmq_utils.get_leakybucket_queue_name()
connection, channel = LeakyBucketRabbitMQ._get_persistent_connection()
queue = LeakyBucketRabbitMQ._declare_queue_safe(channel, current_queue, bucketcapacity)
current_tokens = queue.method.message_count

if current_tokens < bucketcapacity:
req_id = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
message = f'{current_queue}: {req_id}'
channel.basic_publish(exchange='', routing_key=current_queue, body=message.encode())
return f(*args, **kwargs)
else:
return jsonify({"status": "error", "message": "Request limit exceeded!"}), 429
except Exception as e:
LeakyBucketRabbitMQ._connection = None
LeakyBucketRabbitMQ._channel = None
return jsonify({
"status": "error",
"message": f"Erro interno: {str(e)}",
"trace": traceback.format_exc()
}), 500

return wrapped
return decorator

33 changes: 33 additions & 0 deletions internal_apis/backpressure/leaky_bucket_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# internal_apis/backpressure/leaky_bucket_worker.py
import time
from backpressure.rabbitmq_utils import create_rabbitmq_connection
from backpressure.rabbitmq_utils import get_leakybucket_queue_name
from flask import request
import os

def start_leaky_bucket_worker(leakrate, bucketcapacity, queue_name=None):
if queue_name is None:
queue_name = get_leakybucket_queue_name()

print(f"[*] Worker iniciado. Ouvindo a fila: {queue_name}")

connection, channel = create_rabbitmq_connection()
channel.queue_delete(queue=queue_name)
channel.queue_declare(queue=queue_name, durable=True, arguments={'x-max-length': bucketcapacity})

log_file_path = os.path.join(os.path.dirname(__file__), 'test_logs')

while True:
method_frame, header_frame, body = channel.basic_get(queue=queue_name)
if method_frame:
print(f"Mensagem consumida: {body.decode()}") # Log para depuração
with open(log_file_path, "a") as f:
f.write(body.decode() + "\n")
channel.basic_ack(method_frame.delivery_tag)
else:

time.sleep(leakrate)

if __name__ == "__main__":
start_leaky_bucket_worker(0.3, 125, queue_name='global')

Loading