<a href="https://colab.research.google.com/github/Mauriciomsonic/laboratoriomsonicA.I./blob/main/Desafio_ML_Criando_um_Agente_para_Detec%C3%A7%C3%A3o_de_Vulnerabilidades_em_Arquiteturas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# LABORATÓRIO DE VULNERABILIDADES EDUCACIONAL - VERSÃO MELHORADA
# 🛡️ Com tratamento de erros e segurança reforçada

import socket
import threading
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import sqlite3
from html import escape
import time
import re
from random import randint

# =============================================================================
# 1. CONFIGURAÇÕES DE SEGURANÇA
# =============================================================================
DEFAULT_PORT = 8080
MAX_PORT_ATTEMPTS = 10
SECURITY_HEADERS = {
    'X-Content-Type-Options': 'nosniff',
    'X-Frame-Options': 'DENY',
    'X-XSS-Protection': '1; mode=block',
    'Content-Security-Policy': "default-src 'self'",
    'Strict-Transport-Security': 'max-age=31536000; includeSubDomains'
}

# =============================================================================
# 2. HANDLER SEGURO
# =============================================================================
class SecureHTTPHandler(BaseHTTPRequestHandler):

    timeout = 30  # 30 segundos

    def do_GET(self):
        try:
            self.connection.settimeout(self.timeout)
            parsed_path = urlparse(self.path)
            path = parsed_path.path

            # Adicionar headers de segurança
            for header, value in SECURITY_HEADERS.items():
                self.send_header(header, value)

            if path == '/':
                self.serve_homepage()
            elif path == '/search':
                self.serve_search(parsed_path)
            elif path == '/comment':
                self.serve_comment(parsed_path)
            elif path == '/secure':
                self.serve_secure()
            else:
                self.send_error(404, "Página não encontrada")

        except socket.timeout:
            self.log_error("Timeout na requisição")
            self.send_error(408, "Timeout")
        except Exception as e:
            self.log_error(f"Erro interno: {e}")
            self.send_error(500, "Erro interno do servidor")

    def serve_homepage(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()

        html = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>🛡️ Laboratório Seguro</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 40px; }
                .vuln { background: #ffe6e6; padding: 20px; border-radius: 10px; margin: 10px 0; }
                .secure { background: #e6ffe6; padding: 20px; border-radius: 10px; margin: 10px 0; }
            </style>
        </head>
        <body>
            <h1>🔒 Laboratório de Segurança</h1>

            <div class="vuln">
                <h2>🎯 Teste de Busca (Protegido)</h2>
                <form action="/search" method="GET">
                    <input type="text" name="query" placeholder="Buscar usuários..." maxlength="50">
                    <input type="submit" value="Buscar">
                </form>
            </div>

            <div class="vuln">
                <h2>🎯 Teste de Comentários (Protegido)</h2>
                <form action="/comment" method="GET">
                    <input type="text" name="text" placeholder="Deixe um comentário..." maxlength="100">
                    <input type="submit" value="Comentar">
                </form>
            </div>

            <div class="secure">
                <h2>📊 Informações do Servidor</h2>
                <p>Porta: {{PORT}}</p>
                <p>Host: 127.0.0.1</p>
                <p>Headers de segurança ativos</p>
            </div>
        </body>
        </html>
        """.replace("{{PORT}}", str(self.server.server_port))

        self.wfile.write(html.encode('utf-8'))

    def serve_search(self, parsed_path):
        query_params = parse_qs(parsed_path.query)
        search_term = query_params.get('query', [''])[0]

        if not self.validate_input(search_term):
            self.send_error(400, "Input inválido")
            return

        conn = sqlite3.connect('test.db')
        cursor = conn.cursor()

        try:
            # ✅ PREPARED STATEMENT (Seguro)
            cursor.execute("SELECT * FROM users WHERE name LIKE ?", ('%' + search_term + '%',))
            results = cursor.fetchall()

            self.send_response(200)
            self.send_header('Content-type', 'text/html; charset=utf-8')
            self.end_headers()

            response = f"<h2>Resultados para: {escape(search_term)}</h2>"
            if results:
                response += "<ul>"
                for result in results:
                    response += f"<li>{escape(str(result))}</li>"
                response += "</ul>"
            else:
                response += "<p>Nenhum resultado encontrado</p>"
            response += '<br><a href="/">Voltar</a>'

            self.wfile.write(response.encode('utf-8'))

        except Exception as e:
            self.log_error(f"Erro DB: {e}")
            self.send_error(500, "Erro no banco de dados")
        finally:
            conn.close()

    def serve_comment(self, parsed_path):
        query_params = parse_qs(parsed_path.query)
        comment = query_params.get('text', [''])[0]

        if not self.validate_input(comment):
            self.send_error(400, "Input inválido")
            return

        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()

        # ✅ OUTPUT ESCAPADO
        response = f"<h2>Último comentário:</h2>"
        response += f"<p>{escape(comment)}</p>"
        response += '<br><a href="/">Voltar</a>'

        self.wfile.write(response.encode('utf-8'))

    def serve_secure(self):
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()

        secure_html = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>🛡️ Versão Segura</title>
            <style>body { font-family: Arial; margin: 40px; }</style>
        </head>
        <body>
            <h1>🛡️ Medidas de Segurança Implementadas</h1>
            <ul>
                <li>✅ Prepared statements contra SQL Injection</li>
                <li>✅ Escape de output contra XSS</li>
                <li>✅ Validação de input</li>
                <li>✅ Headers de segurança HTTP</li>
                <li>✅ Timeout de conexão</li>
                <li>✅ Bind em 127.0.0.1</li>
            </ul>
            <a href="/">Voltar</a>
        </body>
        </html>
        """
        self.wfile.write(secure_html.encode('utf-8'))

    def validate_input(self, input_str):
        """Validação rigorosa de input"""
        if len(input_str) > 100:
            return False
        # Permitir apenas caracteres alfanuméricos e alguns especiais
        if not re.match(r'^[a-zA-Z0-9\sáàâãéèêíïóôõöúçñÁÀÂÃÉÈÊÍÏÓÔÕÖÚÇÑ!?.,@#$%&*()\-_+=:;<>\/\\\[\]{}| ]+$', input_str):
            return False
        return True

    def log_message(self, format, *args):
        # Log formatado
        super().log_message(format, *args)

# =============================================================================
# 3. FUNÇÕES AUXILIARES
# =============================================================================
def setup_database():
    """Configurar banco de dados de teste"""
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()

    cursor.execute('''
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY,
        name TEXT,
        email TEXT
    )
    ''')

    users = [
        ('Alice Silva', 'alice@email.com'),
        ('Bob Santos', 'bob@email.com'),
        ('Carlos Oliveira', 'carlos@email.com')
    ]

    cursor.executemany('INSERT OR IGNORE INTO users (name, email) VALUES (?, ?)', users)
    conn.commit()
    conn.close()

def find_available_port(start_port, max_attempts=10):
    """Encontrar uma porta disponível"""
    for port in range(start_port, start_port + max_attempts):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('127.0.0.1', port))
                return port
        except OSError:
            continue
    return None

# =============================================================================
# 4. SERVIDOR PRINCIPAL
# =============================================================================
def run_secure_server():
    """Iniciar servidor com tratamento de erros"""

    # Configurar banco
    setup_database()
    print("✅ Banco de dados configurado")

    # Encontrar porta disponível
    port = find_available_port(DEFAULT_PORT, MAX_PORT_ATTEMPTS)
    if port is None:
        print("❌ Não foi possível encontrar uma porta disponível")
        return

    server_address = ('127.0.0.1', port)

    try:
        server = HTTPServer(server_address, SecureHTTPHandler)
        server.timeout = 30

        print(f"🛡️  Servidor seguro rodando em http://127.0.0.1:{port}")
        print("✅ Headers de segurança implementados")
        print("✅ Validação de input ativada")
        print("✅ Timeout configurado (30s)")
        print("🛑 Pressione Ctrl+C para parar o servidor")

        server.serve_forever()

    except OSError as e:
        if e.errno == 98:  # Address already in use
            print(f"❌ Porta {port} já está em uso. Tentando outra...")
            # Tentar automaticamente outra porta
            new_port = find_available_port(port + 1, 5)
            if new_port:
                print(f"🔄 Tentando porta {new_port}...")
                server_address = ('127.0.0.1', new_port)
                server = HTTPServer(server_address, SecureHTTPHandler)
                server.serve_forever()
            else:
                print("❌ Não foi possível iniciar o servidor. Todas as portas estão ocupadas.")
        else:
            print(f"❌ Erro ao iniciar servidor: {e}")

    except KeyboardInterrupt:
        print("\n🛑 Servidor parado pelo usuário")

    except Exception as e:
        print(f"❌ Erro inesperado: {e}")

# =============================================================================
# 5. EXECUÇÃO
# =============================================================================
if __name__ == "__main__":
    print("🚀 Iniciando Laboratório de Segurança")
    print("⚠️  APENAS PARA FINS EDUCACIONAIS ⚠️")
    print("=" * 50)

    run_secure_server()



🚀 Iniciando Laboratório de Segurança
⚠️  APENAS PARA FINS EDUCACIONAIS ⚠️
✅ Banco de dados configurado
🛡️  Servidor seguro rodando em http://127.0.0.1:8081
✅ Headers de segurança implementados
✅ Validação de input ativada
✅ Timeout configurado (30s)
🛑 Pressione Ctrl+C para parar o servidor

🛑 Servidor parado pelo usuário
