#Security Log Analyzer – Monitoramento Automático de Incidentes com Python e Google Sheets



In [3]:
#importando dados
import pandas as pd

log_data = pd.read_csv('london.csv',on_bad_lines="skip")

In [4]:
# indentificando se é payload
def classificar_payload(payload):
    # Payloads binários (b'\x16\x03...) costumam ser handshake TLS / sucesso ou ruído normal
    #startswith -> ele verifica se a string começa com algum tipo de letra boleano
    if payload.startswith("b'\\x16"):
        return "normal"

    # Payloads HTTP GET / POST são tentativas de conexão / podem conter ataques
    if "GET" in payload or "POST" in payload:
        # Verifica possíveis palavras de ataques
        ataques = ['union', 'select', 'drop', 'alert(', 'cmd=', 'exec', '<script']
        if any(a in payload.lower() for a in ataques):
            return "falha"  # tentativa de injeção ou ataque
        else:
            return "sucesso"

    return "outro"


In [5]:
log_data['status'] = log_data['payload'].astype('str').apply(classificar_payload)

In [15]:
import re
import html
import urllib.parse
import base64

# helpers
def is_tls_client_hello(data: bytes) -> bool:
    """Detecta um TLS ClientHello mínimo (record header + handshake type)."""
    if not data or len(data) < 6:
        return False
    try:
        return data[0] == 0x16 and data[5] == 0x01
    except Exception:
        return False

def try_decode_bytes(b: bytes):
    """Tenta converter bytes pra str: utf-8, latin1; retorna str e flag(binary=True se não-textual)."""
    if b is None:
        return "", False
    for enc in ('utf-8', 'latin-1', 'cp1252'):
        try:
            s = b.decode(enc)
            # se contém muitos \x00 ou bytes não imprimíveis, considere binário
            nonprint = sum(1 for ch in s if ord(ch) < 9 and ch not in '\n\r\t')
            if nonprint > 5:
                return s, True
            return s, False
        except Exception:
            continue
    # fallback
    return str(b), True

def _maybe_base64_decode(s):
    s = (s or "").strip()
    if len(s) < 8 or len(s) % 4 != 0:
        return None
    if not re.fullmatch(r'[A-Za-z0-9+/=\s]+', s):
        return None
    try:
        decoded = base64.b64decode(s, validate=True)
        return try_decode_bytes(decoded)[0]
    except Exception:
        return None

# a função principal
def detectar_payload_improved(inp):
    """
    Recebe bytes ou str. Retorna dict:
    {
      'type': categoria principal (ex: 'sql_injection', 'xss', 'command_injection', 'suspicious_tls', 'http_probe', 'normal'),
      'score': 0-100,
      'matches': [{'category':..., 'pattern':..., 'evidence':...}, ...],
      'protocol': 'tls'|'http'|'text'|'binary',
      'evidence': breve trecho útil
    }
    """
    # --- transformar entrada em texto quando possível
    binary_flag = False
    raw = inp
    if isinstance(inp, bytes):
        txt, binary_flag = try_decode_bytes(inp)
    else:
        txt = str(inp or "")
    txt_orig = txt

    # --- detectar envelopes
    if isinstance(inp, bytes) and is_tls_client_hello(inp):
        # é um ClientHello -> suspeita de scan/probe TLS (pode ser legit)
        return {
            'type': 'suspicious_tls_clienthello',
            'score': 40,
            'matches': [{'category': 'tls', 'pattern': 'ClientHello', 'evidence': repr(inp[:64])}],
            'protocol': 'tls',
            'evidence': 'client_hello_bytes'
        }

    # tenta extrair HTTP se for texto que começa com GET/POST/HEAD
    first_line = txt.splitlines()[0] if txt.strip() else ""
    http_method = None
    if re.match(r'^(GET|POST|PUT|DELETE|HEAD|OPTIONS|CONNECT)\s', first_line, flags=re.IGNORECASE):
        http_method = re.match(r'^(GET|POST|PUT|DELETE|HEAD|OPTIONS|CONNECT)', first_line, flags=re.IGNORECASE).group(1).upper()

    protocol = 'binary' if binary_flag else ('http' if http_method else 'text')

    # --- se for HTTP: parse simples (path, headers, body)
    path = ""
    headers = {}
    body = ""
    if http_method:
        lines = txt.split("\r\n")
        # request-line "GET /path HTTP/1.1"
        try:
            parts = lines[0].split()
            if len(parts) >= 2:
                path = parts[1]
        except Exception:
            path = ""
        # headers até linha vazia
        i = 1
        while i < len(lines):
            if lines[i] == "":
                body = "\r\n".join(lines[i+1:])
                break
            if ":" in lines[i]:
                k, v = lines[i].split(":", 1)
                headers[k.strip().lower()] = v.strip()
            i += 1

    # --- normalização básica do texto que vamos checar
    p = txt
    try:
        p = urllib.parse.unquote_plus(p)
    except Exception:
        pass
    p = html.unescape(p)
    p = p.replace('\x00', '')
    p = re.sub(r'\s+', ' ', p).strip()
    p_low = p.lower()

    # tentar detectar base64 dentro do payload e anexar decodificado
    b64_decoded = _maybe_base64_decode(p_low)
    if b64_decoded:
        p_low += " " + b64_decoded.lower()

    # --- padrões
    patterns = {
        'sql_injection': [
            r"union\s+select", r"\bor\s+1=1\b", r"sleep\(\s*\d+\s*\)", r"information_schema", r"into\s+outfile"
        ],
        'xss': [
            r"<\s*script\b", r"onerror\s*=", r"javascript:", r"document\.cookie", r"alert\s*\("
        ],
        'command_injection': [
            r"\b(exec|cmd|system)\b", r"(;|\|\||\||\&\&)\s*(ls|cat|whoami|id|nc|nmap|curl|wget)\b", r"`[^`]+`", r"\$\(.+\)"
        ],
        'path_traversal': [r"\.\./", r"/etc/passwd", r"php://", r"\.\.\\\\"],
        'ssrf': [r"127\.0\.0\.1", r"localhost\b", r"169\.254\.169\.254"],
        'scanner_signs': [r"masscan", r"nmap", r"nessus", r"nikto", r"sqlmap", r"python-requests", r"curl\/"],
        'sensitive_keywords': [r"password", r"passwd", r"api[_-]?key", r"token"]
    }

    matches = []
    score = 0

    # função util
    def check_group(cat, rx_list):
        nonlocal score
        for rx in rx_list:
            try:
                if re.search(rx, p_low, flags=re.IGNORECASE):
                    matches.append({'category': cat, 'pattern': rx, 'evidence': re.search(rx, p_low, flags=re.IGNORECASE).group(0)[:200]})
                    score += {'sql_injection':40,'xss':30,'command_injection':40,'path_traversal':25,'ssrf':30,'scanner_signs':10,'sensitive_keywords':5}.get(cat, 5)
            except re.error:
                continue

    # checar path & headers mais agressivamente (se for http)
    if http_method:
        # verifica path por padrões (ex: /?id=1' OR ...)
        check_group('sql_injection', patterns['sql_injection'])
        check_group('xss', patterns['xss'])
        check_group('command_injection', patterns['command_injection'])
        check_group('path_traversal', patterns['path_traversal'])
        # checa User-Agent e server scanners
        ua = headers.get('user-agent','')
        if ua:
            if re.search(r'(masscan|nmap|nikto|sqlmap|nessus|python-requests|curl)', ua, flags=re.IGNORECASE):
                matches.append({'category': 'scanner_signs', 'pattern': 'user-agent', 'evidence': ua})
                score += 10
        # se host é IP:porta e porta não padrão -> suspeita de scan
        host = headers.get('host','')
        if re.search(r'\d+\.\d+\.\d+\.\d+:\d+', host):
            matches.append({'category': 'suspicious_host', 'pattern': 'host_ip_port', 'evidence': host})
            score += 8
    else:
        # texto genérico
        for cat, rxs in patterns.items():
            check_group(cat, rxs)

    # heurísticas extras
    if re.search(r"[;|`]{3,}", p_low):
        matches.append({'category':'weird_chars','pattern':'[;|`]{3,}','evidence': 'repeated_specials'})
        score += 12

    # limitar score
    score = max(0, min(100, score))

    # decidir tipo principal com prioridade
    if not matches:
        tipo = 'normal'
    else:
        priority = ['command_injection','sql_injection','xss','path_traversal','ssrf','scanner_signs','sensitive_keywords','suspicious_host']
        found = None
        for pr in priority:
            for m in matches:
                if m['category'] == pr:
                    found = pr
                    break
            if found:
                break
        tipo = found or matches[0]['category']

    # compact evidence
    evidence = ''
    if matches:
        evidence = matches[0].get('evidence') or matches[0].get('pattern')

    return {
        'type': tipo,
        'score': score,
        'matches': matches,
        'protocol': protocol,
        'evidence': evidence,
        'raw_preview': p_low[:500]
    }


In [16]:
import ast

 
# Wrapper que limpa/normaliza a coluna payload antes de chamar a sua função

def detectar_payload_wrapper(val):

    # tratar NaN/None

    if pd.isna(val):

        return {'type':'empty','score':0,'matches':[],'protocol':'','evidence':''}
 
    # se já for bytes, usa direto

    if isinstance(val, bytes):

        inp = val

    else:

        # é string — normaliza

        s = str(val).strip()
 
        # 1) remover aspas externas que aparecem no print: '...'

        if (s.startswith("'") and s.endswith("'")) or (s.startswith('"') and s.endswith('"')):

            # cuidado: pode ser "'GET / ...'" ou '"b\'\\x16...\'"'

            # apenas remover aspas externas simples

            s = s[1:-1]
 
        # 2) se for algo que começa com b'...' (string representando bytes), converte pra bytes reais

        if s.startswith("b'") or s.startswith('b"'):

            try:

                # ast.literal_eval converte "b'\\x16\\x03...'" -> bytes reais

                inp = ast.literal_eval(s)

            except Exception:

                # se falhar, mantém a string limpa

                inp = s

        else:

            # 3) se contém muitas sequências \x é provável que seja uma representação com escapes.

            # Tenta decodificar escapes usando latin1 encoding roundtrip ou decode('unicode_escape')

            if '\\x' in s or '\\n' in s or '\\r' in s:

                try:

                    # primeiro tenta interpretar escapes python-style

                    inp = s.encode('utf-8').decode('unicode_escape')

                    # se ainda tem prefixo b'...' depois do decode, tenta literal_eval

                    if (isinstance(inp, str) and (inp.startswith("b'") or inp.startswith('b"'))):

                        try:

                            inp = ast.literal_eval(inp)

                        except Exception:

                            pass

                except Exception:

                    inp = s

            else:

                # string normal (ex: GET / HTTP/1.1...)

                inp = s
 
    # por fim, chama a sua função original e captura erros

    try:

        return detectar_payload_improved(inp)

    except Exception as e:

        # nunca retornar None: estrutura consistente para análise posterior

        return {'type':'error','score':0,'matches':[], 'protocol':'', 'evidence':str(e)}
 
# --- Aplicar ao DataFrame

# supondo que seu df se chama log_data e a coluna é 'payload'

log_data['type_attack'] = log_data['payload'].apply(detectar_payload_wrapper)
 
# Expandir o dicionário em colunas separadas (opcional, facilita análise)

expanded = log_data['type_attack'].apply(pd.Series)

# evita sobrescrever acidentalmente

expanded = expanded.rename(columns=lambda c: f"attack_{c}")

log_data = pd.concat([log_data.reset_index(drop=True), expanded.reset_index(drop=True)], axis=1)
 
# Debug rápido: ver tipos armazenados na coluna payload

print("Tipos encontrados na coluna payload:")

print(log_data['payload'].map(type).value_counts())
 
# Mostrar as primeiras linhas com o resultado

display(log_data[['payload','attack_type','attack_score','attack_protocol','attack_evidence']].head(10))

 

Tipos encontrados na coluna payload:
payload
<class 'str'>    70338
Name: count, dtype: int64


Unnamed: 0,payload,attack_type,attack_type.1,attack_score,attack_score.1,attack_protocol,attack_protocol.1,attack_evidence,attack_evidence.1
0,b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\xf5\t...,suspicious_tls_clienthello,suspicious_tls_clienthello,40.0,40,tls,tls,client_hello_bytes,client_hello_bytes
1,'GET / HTTP/1.1\r\nHost: 3.8.136.101:16026\r\n...,,suspicious_host,,8,,http,,3.8.136.101:16026
2,b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\xd5y\...,suspicious_tls_clienthello,suspicious_tls_clienthello,40.0,40,tls,tls,client_hello_bytes,client_hello_bytes
3,'GET / HTTP/1.1\r\nHost: 3.8.136.101:3119\r\n\...,,suspicious_host,,8,,http,,3.8.136.101:3119
4,b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\x9c\x...,suspicious_tls_clienthello,suspicious_tls_clienthello,40.0,40,tls,tls,client_hello_bytes,client_hello_bytes
5,'GET / HTTP/1.1\r\nHost: 3.8.136.101:12181\r\n...,,suspicious_host,,8,,http,,3.8.136.101:12181
6,'',,normal,,0,,text,,
7,b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03Kl\xf4...,suspicious_tls_clienthello,suspicious_tls_clienthello,40.0,40,tls,tls,client_hello_bytes,client_hello_bytes
8,'GET / HTTP/1.1\r\nHost: 3.8.136.101:21\r\n\r\n',,suspicious_host,,8,,http,,3.8.136.101:21
9,b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03#5\x19...,suspicious_tls_clienthello,suspicious_tls_clienthello,40.0,40,tls,tls,client_hello_bytes,client_hello_bytes


In [17]:
log_data['type_attack'] = log_data['payload'].astype('str').apply(detectar_payload_improved)

In [18]:
log_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 70338 entries, 0 to 70337
Data columns (total 18 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   time                70335 non-null  object 
 1   payload             70338 non-null  object 
 2   from                70335 non-null  object 
 3   port                70335 non-null  object 
 4   country             70332 non-null  object 
 5   status              70338 non-null  object 
 6   type_attack         70338 non-null  object 
 7   attack_type         11344 non-null  object 
 8   attack_score        11344 non-null  float64
 9   attack_matches      11344 non-null  object 
 10  attack_protocol     11344 non-null  object 
 11  attack_evidence     11344 non-null  object 
 12  attack_type         70338 non-null  object 
 13  attack_score        70338 non-null  int64  
 14  attack_matches      70338 non-null  object 
 15  attack_protocol     70338 non-null  object 
 16  atta

#Contando tentativas

In [19]:
# tentativas de ataque e falhas

falhas = log_data[log_data['status'] == 'falha']
tentativas = falhas['from'].value_counts().reset_index()
tentativas.columns = ['from', 'qtd_falhas']


In [20]:
# tentativas e sucesso

sucessos_ataque = log_data[log_data['status'] == 'sucesso']
tentativas_su = sucessos_ataque['from'].value_counts().reset_index()
tentativas_su.columns = ['from', 'qtd_sucesso']

#Consultando geolocalização

In [21]:
log_data.head()

Unnamed: 0,time,payload,from,port,country,status,type_attack,attack_type,attack_score,attack_matches,attack_protocol,attack_evidence,attack_type.1,attack_score.1,attack_matches.1,attack_protocol.1,attack_evidence.1,attack_raw_preview
0,"05/01/2021, 19:42:28",b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\xf5\t...,'162.142.125.55',44354,London,normal,"{'type': 'normal', 'score': 0, 'matches': [], ...",suspicious_tls_clienthello,40.0,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,suspicious_tls_clienthello,40,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,
1,"05/01/2021, 19:42:29",'GET / HTTP/1.1\r\nHost: 3.8.136.101:16026\r\n...,'162.142.125.55',53424,London,sucesso,"{'type': 'normal', 'score': 0, 'matches': [], ...",,,,,,suspicious_host,8,"[{'category': 'suspicious_host', 'pattern': 'h...",http,3.8.136.101:16026,get / http/1.1 host: 3.8.136.101:16026
2,"05/01/2021, 19:43:40",b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\xd5y\...,'162.142.125.56',51004,London,normal,"{'type': 'normal', 'score': 0, 'matches': [], ...",suspicious_tls_clienthello,40.0,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,suspicious_tls_clienthello,40,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,
3,"05/01/2021, 19:43:42",'GET / HTTP/1.1\r\nHost: 3.8.136.101:3119\r\n\...,'162.142.125.56',59068,London,sucesso,"{'type': 'normal', 'score': 0, 'matches': [], ...",,,,,,suspicious_host,8,"[{'category': 'suspicious_host', 'pattern': 'h...",http,3.8.136.101:3119,get / http/1.1 host: 3.8.136.101:3119
4,"05/01/2021, 19:44:55",b'\x16\x03\x01\x00{\x01\x00\x00w\x03\x03\x9c\x...,'162.142.125.38',40798,London,normal,"{'type': 'normal', 'score': 0, 'matches': [], ...",suspicious_tls_clienthello,40.0,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,suspicious_tls_clienthello,40,"[{'category': 'tls', 'pattern': 'ClientHello',...",tls,client_hello_bytes,


In [22]:
import requests

def localizar_ip(ip):
    try:
        ip = str(ip).strip().replace("'", "")  # remove aspas e espaços extras
        resp = requests.get(f'https://ipinfo.io/{ip}/json', timeout=3)
        if resp.status_code == 200:
            data = resp.json()
            return data.get('country', 'Desconhecido')
        else:
            return f"Erro_{resp.status_code}"
    except Exception as e:
        return 'Erro'

# Adiciona a coluna country em cada DataFrame
tentativas['country'] = tentativas['from'].apply(localizar_ip)
tentativas_su['country'] = tentativas_su['from'].apply(localizar_ip)


In [23]:
analise = pd.merge(tentativas, tentativas_su, on='from', how='outer', suffixes=('falha', 'sucesso'))
analise['suspeito'] = (analise['qtd_falhas'] > 5) & (analise['qtd_sucesso'] > 5)

analise = analise.replace([float('inf'), float('-inf')], None).fillna('')



# Importando para planilha

In [28]:
import gspread

from oauth2client.service_account import ServiceAccountCredentials

import numpy as np

import pandas as pd
 
# 1. Escopos corretos

scope = [

    "https://spreadsheets.google.com/feeds",

    "https://www.googleapis.com/auth/spreadsheets",

    "https://www.googleapis.com/auth/drive"

]
 
# 2. Arquivo JSON da conta de serviço

creds = ServiceAccountCredentials.from_json_keyfile_name("chave_google.json", scope)

client = gspread.authorize(creds)
 
# 3. Função que limpa e converte tipos

def limpar_df(df):

    df = df.replace([np.inf, -np.inf, None], "").fillna("")

    # converte qualquer valor não-serializável (como dict/list) para string

    for col in df.columns:

        df[col] = df[col].apply(lambda x: str(x) if not isinstance(x, (int, float, str)) else x)

    return df
 
# 4. Abre planilha e envia dados

planilha = client.open("Monitoramento Log")
 
sheet1 = planilha.worksheet("Tentativas ")

sheet1.clear()

sheet1.update([tentativas.columns.values.tolist()] + limpar_df(tentativas).values.tolist())
 
sheet2 = planilha.worksheet("tentativas_su")

sheet2.clear()

sheet2.update([tentativas_su.columns.values.tolist()] + limpar_df(tentativas_su).values.tolist())
 
sheet3 = planilha.worksheet("Analise")

sheet3.clear()

sheet3.update([analise.columns.values.tolist()] + limpar_df(analise).values.tolist())
 
sheet4 = planilha.worksheet("Falhas")

sheet4.clear()

sheet4.update([falhas.columns.values.tolist()] + limpar_df(falhas).values.tolist())
 
sheet5 = planilha.worksheet("Sucesso")

sheet5.clear()

sheet5.update([sucessos_ataque.columns.values.tolist()] + limpar_df(sucessos_ataque).values.tolist())

 

  df[col] = df[col].apply(lambda x: str(x) if not isinstance(x, (int, float, str)) else x)


{'spreadsheetId': '1NEPWtDUvjuzsq6g-Y7jzbGBFDhuBJJbRcCYQNvfKHWk',
 'updatedRange': 'Sucesso!A1:R15963',
 'updatedRows': 15963,
 'updatedColumns': 18,
 'updatedCells': 287334}

In [None]:
sucessos_ataque