#### Imports

In [1]:
from flask import Flask , request, jsonify
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import pytest
import requests
import logging
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

#### Implementação do Servidor Vulnerável

In [2]:
app = Flask(__name__)

# Definindo o limite máximo de 10MB para uploads
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024  # Limite de 10MB

stored_data = []

@app.route('/upload', methods=['POST'])
def upload():
    data = request.get_data()  # Aceita qualquer quantidade de dados
    stored_data.append(data)  # Armazena os dados em memória

    return f"Received {len(data)} bytes!", 200

@app.route('/health')
def health():
    return 'Servidor está funcionando!'

def run_flask():
    app.run(debug=True, port=5001, use_reloader=False)  # Desabilita o reloader para evitar problemas com o Jupyter

# Rodar o servidor Flask em uma thread separada
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5001
Press CTRL+C to quit


#### Verificação do Status de "saúde" do servidor

In [3]:


url = "http://127.0.0.1:5001/health"  # URL de status de saúde do servidor

def check_server_status():
    try:
        response = requests.get(url, timeout=5)  # Timeout de 5 segundos
        if response.status_code == 200:
            print("Servidor está ativo e funcionando.")
        else:
            print(f"Servidor retornou status {response.status_code}.")
    except requests.exceptions.RequestException as e:
        print(f"Erro ao tentar acessar o servidor: {e}")

# Verifique o estado do servidor
check_server_status()


127.0.0.1 - - [08/Dec/2024 10:03:26] "GET /health HTTP/1.1" 200 -


Servidor está ativo e funcionando.


#### Implementação dos casos de teste maliciosos

In [25]:
%%writefile test_server.py
import pytest
import requests

# Endpoints do servidor vulnerável
BASE_URL = "http://127.0.0.1:5001"
UPLOAD_ENDPOINT = f"{BASE_URL}/upload"
HEALTH_ENDPOINT = f"{BASE_URL}/health"

# Função auxiliar para verificar se o servidor está online
def check_server():
    response = requests.get(HEALTH_ENDPOINT, timeout=5)
    assert response.status_code == 200, "Servidor não está respondendo corretamente!"

# Teste 1: Envio rápido de múltiplas requisições (Flood)
def test_flood_requests():
    """Teste: Explorar ausência de limitação de taxa enviando múltiplas requisições rapidamente."""
    responses = []
    for _ in range(100):  # Simula 100 requisições rápidas
        response = requests.post(UPLOAD_ENDPOINT, data=b"Flood Test")
        responses.append(response.status_code)
    assert all(status == 200 for status in responses), "ERRO: O servidor não processou todas as requisições de flood."

# Teste 2: Explorar ausência de validação ao enviar dados inválidos
def test_invalid_data():
    """Teste: Enviar dados maliciosos ou inválidos que o servidor não valida."""
    invalid_data = "This is not binary or JSON data"  # Dados inválidos
    response = requests.post(UPLOAD_ENDPOINT, data=invalid_data)
    assert response.status_code == 200, (
        f"ERRO: O servidor rejeitou dados inválidos com código {response.status_code}."
    )

# Teste 3: Explorar acúmulo de dados em memória
def test_memory_accumulation():
    """Teste: Enviar múltiplos dados para explorar o acúmulo na memória do servidor."""
    for i in range(200):  # Simula 200 requisições
        requests.post(UPLOAD_ENDPOINT, data=f"Data {i}".encode())

    # O servidor não tem controle de limpeza, logo, deve continuar aceitando dados
    response = requests.post(UPLOAD_ENDPOINT, data=b"Final Check")
    assert response.status_code == 200, "ERRO: O servidor não conseguiu processar devido à exaustão de memória."

# Teste 4: Enviar carga maliciosa dentro do limite permitido
def test_malicious_payload_within_limit():
    """Teste: Enviar carga maliciosa dentro do limite de 10 MB permitido."""
    payload = b"A" * (10 * 1024 * 1024 - 1)  # 1 byte a menos que o limite
    response = requests.post(UPLOAD_ENDPOINT, data=payload)
    assert response.status_code == 200, (
        f"ERRO: O servidor não aceitou uma carga maliciosa válida com código {response.status_code}."
    )


Overwriting test_server.py


In [4]:
!pytest test_server.py -v


127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:03:31] "POST /upload HTTP/1.1" 2

platform win32 -- Python 3.11.4, pytest-8.3.4, pluggy-1.5.0 -- C:\Users\estev\AppData\Local\Programs\Python\Python311\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\estev\Desktop\Projeto de teste de software
[1mcollecting ... [0mcollected 4 items

test_server.py::test_flood_requests [32mPASSED[0m[32m                               [ 25%][0m
test_server.py::test_invalid_data [32mPASSED[0m[32m                                 [ 50%][0m
test_server.py::test_memory_accumulation [32mPASSED[0m[32m                          [ 75%][0m
test_server.py::test_malicious_payload_within_limit [32mPASSED[0m[32m               [100%][0m



#### implementando mitigações no código do servidor

In [None]:
app = Flask(__name__)

# Configuração de limite de tamanho para uploads
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024  # Limite de 10MB

# Configurando Flask-Limiter para limitar requisições
limiter = Limiter(
    get_remote_address,
    app=app,
    default_limits=["5 per second"]  # Máximo de 5 requisições por segundo
)

# Configurando o logger
logging.basicConfig(level=logging.INFO)

# Simulação de armazenamento de dados
stored_data = []
MAX_STORED_DATA = 1000  # Limite máximo de dados armazenados em memória

@app.before_request
def log_request():
    """Log de requisições recebidas."""
    logging.info(f"Requisição de {request.remote_addr} para {request.path}")

@app.route("/upload", methods=["POST"])
@limiter.limit("5 per second")  # Aplica limitação de requisições
def upload():
    """Endpoint de upload com validação de entrada e limite de armazenamento."""
    try:
        data = request.get_data()

        # Limitação do armazenamento em memória
        stored_data.append(data)
        if len(stored_data) > MAX_STORED_DATA:
            stored_data.pop(0)  # Remove o dado mais antigo

        return f"Received {len(data)} bytes!", 200

    except Exception as e:
        logging.error(f"Erro ao processar dados: {e}")
        return jsonify({"error": "Erro ao processar a requisição"}), 500

@app.route("/health")
def health():
    """Endpoint de verificação de saúde."""
    return "Servidor está funcionando!"

def run_flask():
    """Executa o servidor Flask em uma thread separada."""
    app.run(debug=True, port=5001, use_reloader=False)

# Inicia o servidor em uma thread separada
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()




 * Serving Flask app '__main__'


 * Debug mode: on


 * Running on http://127.0.0.1:5001
 * Running on http://127.0.0.1:5001
Press CTRL+C to quit
INFO:werkzeug:[33mPress CTRL+C to quit[0m


#### Implementando casos de teste de mitigação de entradas maliciosas

In [3]:
# Configurando o logger para exibir mensagens
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Endpoints do servidor
BASE_URL = "http://127.0.0.1:5001"
UPLOAD_ENDPOINT = f"{BASE_URL}/upload"
HEALTH_ENDPOINT = f"{BASE_URL}/health"

# Verifica se o servidor está online antes de rodar os testes
def check_server():
    try:
        response = requests.get(HEALTH_ENDPOINT, timeout=5)
        if response.status_code == 200:
            logger.info("O servidor está online e pronto para os testes de mitigação.")
        else:
            logger.error(f"Erro ao verificar o servidor: {response.status_code}")
            exit(1)
    except requests.exceptions.RequestException as e:
        logger.error(f"Falha ao conectar ao servidor: {e}")
        exit(1)

# Testes de validação de mitigação
def test_valid_payload():
    """Envia um payload válido (1 MB) e verifica se o servidor aceita."""
    valid_data = b"A" * (1 * 1024 * 1024)  # Dados de 1 MB
    response = requests.post(UPLOAD_ENDPOINT, data=valid_data)
    assert response.status_code == 200, (
        f"ERRO: O servidor deveria aceitar um payload válido de 1 MB, mas retornou {response.status_code}."
    )
    logger.info("Teste de payload válido: O servidor aceitou corretamente o dado esperado (200 OK).")

def test_large_payload():
    """Envia um payload maior que o permitido (11 MB) e verifica se o servidor rejeita."""
    large_data = b"A" * (11 * 1024 * 1024)  # Dados de 11 MB
    response = requests.post(UPLOAD_ENDPOINT, data=large_data)
    assert response.status_code == 413, (
        f"ERRO: O servidor deveria rejeitar payloads maiores que 10 MB, mas retornou {response.status_code}."
    )
    logger.info("Teste de payload grande: O servidor rejeitou corretamente o dado (413 Payload Too Large).")

def test_timeout():
    """Simula envio lento de dados para verificar se o servidor aplica o timeout."""
    try:
        response = requests.post(UPLOAD_ENDPOINT, data=(b"A" * 1024), timeout=1)  # Timeout curto de 1 segundo
    except requests.exceptions.Timeout:
        logger.info("Teste de timeout: O servidor encerrou corretamente a conexão após o tempo limite.")
        return  # O timeout esperado foi alcançado

    assert False, "ERRO: O servidor deveria encerrar a conexão devido ao timeout, mas não o fez."

def test_rate_limiting():
    """Testa o limite de requisições por IP em um curto intervalo de tempo."""
    responses = []
    for i in range(10):  # Envia 10 requisições rápidas
        response = requests.post(UPLOAD_ENDPOINT, data=b"Test")
        responses.append(response.status_code)
    
    # Verificar se pelo menos algumas requisições foram rejeitadas (429)
    assert 429 in responses, (
        "ERRO: O servidor deveria rejeitar requisições excedendo o limite por IP, retornando 429 Too Many Requests."
    )
    logger.info("Teste de rate limiting: O servidor aplicou corretamente o limite de requisições (429 Too Many Requests).")

def test_memory_limitation():
    """Testa se o armazenamento em memória respeita o limite configurado."""
    for i in range(1050):  # Envia mais dados que o limite de 1000 entradas
        response = requests.post(UPLOAD_ENDPOINT, data=f"Data {i}".encode())
        assert response.status_code == 200, (
            f"ERRO: O servidor deveria aceitar os dados, mas retornou {response.status_code}."
        )

    # O tamanho máximo armazenado deve ser 1000
    assert len(stored_data) <= 1000, "ERRO: O servidor não está respeitando o limite de armazenamento."

    logger.info("Teste de limite de memória: O servidor respeitou o limite configurado.")

# Código principal
if __name__ == "__main__":
    logger.info("Verificando conectividade com o servidor...")
    check_server()

    logger.info("Iniciando os testes automatizados...")
    pytest.main(["-v", "-s"])


INFO:__main__:Verificando conectividade com o servidor...
127.0.0.1 - - [08/Dec/2024 10:04:52] "GET /health HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "GET /health HTTP/1.1" 200 -
INFO:__main__:O servidor está online e pronto para os testes de mitigação.
INFO:__main__:Iniciando os testes automatizados...


platform win32 -- Python 3.11.4, pytest-8.3.4, pluggy-1.5.0 -- c:\Users\estev\AppData\Local\Programs\Python\Python311\python.exe
cachedir: .pytest_cache
rootdir: c:\Users\estev\Desktop\Projeto de teste de software
[1mcollecting ... [0mcollected 4 items

test_server.py::test_flood_requests 

127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:0

[32mPASSED[0m
test_server.py::test_invalid_data 

127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -


[32mPASSED[0m
test_server.py::test_memory_accumulation 

127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [08/Dec/2024 10:04:52] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:0

[32mPASSED[0m
test_server.py::test_malicious_payload_within_limit 

127.0.0.1 - - [08/Dec/2024 10:04:53] "POST /upload HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [08/Dec/2024 10:04:53] "POST /upload HTTP/1.1" 200 -


[32mPASSED[0m

