### Imports e utilitários

In [1]:
import os, json, math, glob
from pathlib import Path
from typing import List, Dict, Any

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support

### Utilitários

In [2]:
# CÉLULA — Parser robusto p/ JSONL com dicts em string (aspas simples / None)
import json, re
from ast import literal_eval
from typing import Any, Dict, List

def _try_literal_eval(s: str) -> Any:
    """Tenta converter string 'estilo Python' (ex.: "{'k': 'v'}", 'None') em objeto Python."""
    s = s.strip()
    if s in ("None", "none", "NULL", "Null", "null"):
        return None
    # se parece com dict/list/tuple/num/str python → tenta literal_eval
    if (s.startswith("{") and s.endswith("}")) or \
       (s.startswith("[") and s.endswith("]")) or \
       (s.startswith("(") and s.endswith(")")) or \
       (s.startswith("'") and s.endswith("'")) or \
       (s.startswith('"') and s.endswith('"')):
        try:
            return literal_eval(s)
        except Exception:
            pass
    # último recurso: se parece com JSON de verdade (aspas duplas), tenta json.loads
    if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")):
        try:
            return json.loads(s)
        except Exception:
            pass
    return s  # mantém como string

def _normalize_pyish(obj: Any) -> Any:
    """Converte recursivamente strings 'pythonizadas' em estruturas Python."""
    if isinstance(obj, dict):
        return {k: _normalize_pyish(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_normalize_pyish(v) for v in obj]
    if isinstance(obj, str):
        val = _try_literal_eval(obj)
        # se mudou de tipo, normaliza de novo (pode ter aninhado)
        if val is not obj:
            return _normalize_pyish(val)
        return obj
    return obj

def read_jsonl_lines(path: str) -> List[Any]:
    """Lê JSONL tolerante e normaliza campos-string que são dict/list pythonizados."""
    out: List[Any] = []
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for raw in f:
            line = raw.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)
            except json.JSONDecodeError:
                # linha não é JSON puro; tenta avaliar direto como python literal
                obj = _try_literal_eval(line)
            obj = _normalize_pyish(obj)
            out.append(obj)
    return out

def event_to_text(evt: Any) -> str:
    """Extrai texto útil do evento (dict esperado, mas aceita outros tipos)."""
    if isinstance(evt, dict):
        parts = []

        # winlog.task
        winlog = evt.get("winlog")
        if isinstance(winlog, dict):
            task = winlog.get("task")
            if task: parts.append(str(task))
        elif isinstance(winlog, str):
            parts.append(winlog)

        # process.command_line / process.name
        proc = evt.get("process")
        if isinstance(proc, dict):
            cmd = proc.get("command_line")
            name = proc.get("name")
            if name: parts.append(str(name))
            if cmd:  parts.append(str(cmd))
        elif isinstance(proc, str):
            parts.append(proc)

        # file.path/name/target_path
        fobj = evt.get("file")
        if isinstance(fobj, dict):
            for k in ("path", "name", "target_path"):
                v = fobj.get(k)
                if v: parts.append(str(v))
        elif isinstance(fobj, str):
            parts.append(fobj)

        if parts:
            return " ".join(parts)
        # fallback: serializa curto
        try: return json.dumps(evt, ensure_ascii=False)
        except Exception: return str(evt)

    if isinstance(evt, list):
        return "\n".join(event_to_text(x) for x in evt)
    if isinstance(evt, str):
        return evt
    try: return json.dumps(evt, ensure_ascii=False)
    except Exception: return str(evt)

def file_to_document_text(path: str) -> str:
    events = read_jsonl_lines(path)
    texts = [event_to_text(e) for e in events]
    return "\n".join(texts)


### Montagem do dataset (e fatiamento do benigno)

In [92]:
ATTACK_GLOB = "./attacks/*.jsonl"
BENIGN_FILE = "./benign/benign.jsonl"

attack_files = sorted(glob.glob(ATTACK_GLOB))
assert len(attack_files) > 0, "Nenhum arquivo encontrado em ./attacks/*.jsonl"

assert os.path.exists(BENIGN_FILE), "Arquivo benigno ./benign/benign.jsonl não encontrado"

len(attack_files), attack_files[:3]


(1164,
 ['./attacks\\20250815_164702.jsonl',
  './attacks\\20250815_164734.jsonl',
  './attacks\\20250815_164800.jsonl'])

In [93]:
attack_line_counts = [count_lines(p) for p in attack_files]
median_attack_lines = int(np.median(attack_line_counts)) if attack_line_counts else 500

benign_total_lines = count_lines(BENIGN_FILE)

# número de pedaços benignos ~ benign_total / mediana_ataque, limitado para não explodir
K = 500

print(f"Mediana de linhas (ataque): {median_attack_lines}")
print(f"Linhas do benigno: {benign_total_lines}")
print(f"Numero de fatias benignas (K): {K}")

Mediana de linhas (ataque): 31
Linhas do benigno: 10000
Numero de fatias benignas (K): 500


In [94]:
# 1) Documentos de ataque (label=1)
X_text, y, ids = [], [], []

for path in attack_files:
    doc = file_to_document_text(path)
    X_text.append(doc)
    y.append(1)
    ids.append(Path(path).name)

# 2) Documentos benignos (label=0), fatiando o arquivo grande
benign_lines_raw = read_jsonl_lines(BENIGN_FILE)
chunk_size = math.ceil(len(benign_lines_raw) / K)

for i in range(K):
    chunk = benign_lines_raw[i*chunk_size : (i+1)*chunk_size]
    if not chunk:
        continue
    # transformar o chunk em texto
    chunk_texts = [event_to_text(evt) or repr(evt) for evt in chunk]
    doc = "\n".join(chunk_texts)
    X_text.append(doc)
    y.append(0)
    ids.append(f"benign_chunk_{i:03d}")

print(f"Total documentos: {len(X_text)} | Maliciosos: {sum(y)} | Benignos: {len(y)-sum(y)}")

Total documentos: 1664 | Maliciosos: 1164 | Benignos: 500


In [95]:
df = pd.DataFrame({"id": ids, "label": y, "n_chars": [len(t) for t in X_text]})
df.sample(min(10, len(df)))

Unnamed: 0,id,label,n_chars
845,20250818_023855.jsonl,1,6018
205,20250815_195101.jsonl,1,7083
137,20250815_191937.jsonl,1,3653
1222,benign_chunk_058,0,1749
492,20250815_222414.jsonl,1,1653
960,20250826_175043.jsonl,1,0
1551,benign_chunk_387,0,1819
508,20250815_222936.jsonl,1,804
789,20250816_011100.jsonl,1,5972
1398,benign_chunk_234,0,2041


### Split e avaliação auxiliar

In [None]:
def evaluate_model(name, model, X_train, X_test, y_train, y_test):
    model.fit(X_train, y_train)
    pred = model.predict(X_test)
    acc = accuracy_score(y_test, pred)
    pr, rc, f1, _ = precision_recall_fscore_support(y_test, pred, average="binary", zero_division=0)
    print(f"\n[{name}]")
    print(f"Accuracy: {acc:.3f} | Precision: {pr:.3f} | Recall: {rc:.3f} | F1: {f1:.3f}")
    print(classification_report(y_test, pred, digits=3))
    return {"model": name, "accuracy": acc, "precision": pr, "recall": rc, "f1": f1}

# Split estratificado
X_train_text, X_test_text, y_train, y_test = train_test_split(
    X_text, y, test_size=0.2, random_state=42, stratify=y
)

# Vetorizador compartilhado
vectorizer = TfidfVectorizer(
    lowercase=True,
    max_features=100_000,   # limitar um pouco
    ngram_range=(1,2),      # unigrams e bigrams
    token_pattern=r"(?u)\b\w+\b",
)

### Treinos

In [97]:
# Logistic Regression (regressor "softmax" no multi-classe; aqui binário)
logreg = make_pipeline(
    vectorizer,
    LogisticRegression(max_iter=500, n_jobs=None)  # simples
)
m1 = evaluate_model("LogisticRegression", logreg, X_train_text, X_test_text, y_train, y_test)


[LogisticRegression]
Accuracy: 0.991 | Precision: 0.987 | Recall: 1.000 | F1: 0.994
              precision    recall  f1-score   support

           0      1.000     0.970     0.985       100
           1      0.987     1.000     0.994       233

    accuracy                          0.991       333
   macro avg      0.994     0.985     0.989       333
weighted avg      0.991     0.991     0.991       333



In [98]:
# Linear SVM
linsvm = make_pipeline(
    vectorizer,
    LinearSVC()
)
m2 = evaluate_model("LinearSVC", linsvm, X_train_text, X_test_text, y_train, y_test)


[LinearSVC]
Accuracy: 0.997 | Precision: 0.996 | Recall: 1.000 | F1: 0.998
              precision    recall  f1-score   support

           0      1.000     0.990     0.995       100
           1      0.996     1.000     0.998       233

    accuracy                          0.997       333
   macro avg      0.998     0.995     0.996       333
weighted avg      0.997     0.997     0.997       333



In [99]:
# Decision Tree (tende a underfitting com TF-IDF esparso, mas incluímos)
dtree = make_pipeline(
    vectorizer,
    DecisionTreeClassifier(random_state=42, max_depth=30)
)
m3 = evaluate_model("DecisionTree", dtree, X_train_text, X_test_text, y_train, y_test)


[DecisionTree]
Accuracy: 1.000 | Precision: 1.000 | Recall: 1.000 | F1: 1.000
              precision    recall  f1-score   support

           0      1.000     1.000     1.000       100
           1      1.000     1.000     1.000       233

    accuracy                          1.000       333
   macro avg      1.000     1.000     1.000       333
weighted avg      1.000     1.000     1.000       333



In [100]:
#  Random Forest (com TF-IDF esparso nem sempre é ideal, mas funciona)
rf = make_pipeline(
    vectorizer,
    RandomForestClassifier(
        n_estimators=200,
        random_state=42,
        n_jobs=-1,
        max_depth=None
    )
)
m4 = evaluate_model("RandomForest", rf, X_train_text, X_test_text, y_train, y_test)


[RandomForest]
Accuracy: 1.000 | Precision: 1.000 | Recall: 1.000 | F1: 1.000
              precision    recall  f1-score   support

           0      1.000     1.000     1.000       100
           1      1.000     1.000     1.000       233

    accuracy                          1.000       333
   macro avg      1.000     1.000     1.000       333
weighted avg      1.000     1.000     1.000       333



In [101]:
# Tabela comparativa de métricas
score_table = pd.DataFrame([m1, m2, m3, m4]).sort_values("f1", ascending=False)
score_table

Unnamed: 0,model,accuracy,precision,recall,f1
3,RandomForest,1.0,1.0,1.0,1.0
2,DecisionTree,1.0,1.0,1.0,1.0
1,LinearSVC,0.996997,0.995726,1.0,0.997859
0,LogisticRegression,0.990991,0.987288,1.0,0.993603


### Salvar o melhor modelo

In [102]:
# Salvar todos os modelos e também destacar o melhor
import os, joblib

# dicionário com todos os pipelines treinados
all_models = {
    "LogisticRegression": logreg,
    "LinearSVC": linsvm,
    "DecisionTree": dtree,
    "RandomForest": rf,
}

os.makedirs("./models", exist_ok=True)

# salva cada um
for name, pipe in all_models.items():
    path = f"./models/model_{name}.joblib"
    joblib.dump(pipe, path)
    print("Salvo:", path)

# opcional: salva também o melhor (com base no score_table)
best_name = score_table.iloc[0]["model"]
best_pipe = all_models[best_name]
best_path = f"./models/best_model_{best_name}.joblib"
joblib.dump(best_pipe, best_path)
print("Salvo melhor modelo em:", best_path)

# opcional: salvar a tabela de métricas para referência
score_table.to_csv("./models/metrics_summary.csv", index=False)
print("Salvo resumo de métricas em: ./models/metrics_summary.csv")

Salvo: ./models/model_LogisticRegression.joblib
Salvo: ./models/model_LinearSVC.joblib
Salvo: ./models/model_DecisionTree.joblib
Salvo: ./models/model_RandomForest.joblib
Salvo melhor modelo em: ./models/best_model_RandomForest.joblib
Salvo resumo de métricas em: ./models/metrics_summary.csv


### Inferência em novos arquivos/diretórios

In [9]:
import glob
from pathlib import Path
import pandas as pd
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import warnings
warnings.filterwarnings("ignore", category=SyntaxWarning)


def predict_file_label(model, path: str) -> int:
    """
    Lê um arquivo JSONL, transforma em texto (via file_to_document_text),
    e usa o modelo para prever se o arquivo é malicioso (1) ou benigno (0).
    """
    doc = file_to_document_text(path)
    pred = model.predict([doc])[0]
    return int(pred)

def batch_predict_dir(model, pattern: str = "../data/new_samples/*.jsonl", true_label: int|None=None, save_csv: str|None=None):
    """
    Roda predição em lote em todos os arquivos que batem com o padrão `pattern`.
    
    Args:
        model: modelo sklearn já treinado/carregado.
        pattern: caminho com wildcard para os arquivos (ex: "../data/attack/*.jsonl").
        true_label: se informado (0 ou 1), calcula métricas de avaliação.
        save_csv: caminho opcional para salvar os resultados em CSV.

    Returns:
        df_pred: DataFrame com resultados (file, pred, true, status).
    """
    files = sorted(glob.glob(pattern))
    if not files:
        print("Nenhum arquivo encontrado para inferência.")
        return None

    rows = []
    for p in files:
        try:
            yhat = predict_file_label(model, p)
            rows.append({
                "file": Path(p).name,
                "pred": yhat,
                "true": true_label,
                "status": "ok"
            })
        except Exception as e:
            rows.append({
                "file": Path(p).name,
                "pred": None,
                "true": true_label,
                "status": f"erro: {type(e).__name__}: {e}"
            })
            print(f"{Path(p).name:40s} => ERRO ({type(e).__name__}: {e})")

    df_pred = pd.DataFrame(rows)

    # Métricas apenas nos que deram certo
    if true_label is not None:
        ok = df_pred[df_pred["status"] == "ok"]
        if len(ok) > 0:
            acc = accuracy_score(ok["true"], ok["pred"])
            pr, rc, f1, _ = precision_recall_fscore_support(
                ok["true"], ok["pred"], average="binary", zero_division=0
            )
            print(f"\nEval (somente arquivos OK; rótulo verdadeiro={true_label}): "
                  f"Acc={acc:.3f} | P={pr:.3f} | R={rc:.3f} | F1={f1:.3f}")
        else:
            print("\nNão houve arquivos válidos para calcular métricas.")

    # salvar resultados em CSV se pedido
    if save_csv:
        df_pred.to_csv(save_csv, index=False)
        print(f"Resultados salvos em {save_csv}")

    return df_pred


In [None]:
# Avaliar TODOS os modelos salvos contra TODOS os conjuntos em ../filtering/<filtro>/{attack,safe}
import os, glob, joblib
from pathlib import Path
import pandas as pd
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# ----------------------------------------
# Descobre automaticamente os conjuntos:
# ../filtering/<filter_model>/attack/*.jsonl
# ../filtering/<filter_model>/safe/*.jsonl
# ----------------------------------------
def discover_filtering_eval_sets(base_dir: str = "../filtering"):
    """
    Retorna uma lista de dicts:
    [{'pattern': '.../*.jsonl', 'true': 1/0, 'set': 'attack'/'safe', 'filter_model': '<nome_pasta>'}, ...]
    """
    eval_sets = []
    base_path = Path(base_dir)
    if not base_path.exists():
        print(f"Aviso: {base_dir} não encontrado.")
        return eval_sets

    for sub in sorted(p for p in base_path.iterdir() if p.is_dir()):
        filter_model = sub.name  # ex.: gemma3_4b
        attack_dir = sub / "attack"
        safe_dir   = sub / "safe"

        if attack_dir.exists():
            eval_sets.append({
                "pattern": str(attack_dir / "*.jsonl"),
                "true": 1,
                "set": "attack",
                "filter_model": filter_model
            })
        if safe_dir.exists():
            eval_sets.append({
                "pattern": str(safe_dir / "*.jsonl"),
                "true": 0,
                "set": "safe",
                "filter_model": filter_model
            })
    return eval_sets


def eval_df_metrics(df: pd.DataFrame):
    """Computa métricas binárias em um DataFrame de previsões (somente linhas OK)."""
    ok = df[df["status"] == "ok"].copy()
    if "true" not in ok or ok["true"].isna().all() or len(ok) == 0:
        return {"n_ok": len(ok), "accuracy": None, "precision": None, "recall": None, "f1": None}
    acc = accuracy_score(ok["true"], ok["pred"])
    pr, rc, f1, _ = precision_recall_fscore_support(ok["true"], ok["pred"], average="binary", zero_division=0)
    return {"n_ok": len(ok), "accuracy": acc, "precision": pr, "recall": rc, "f1": f1}


def evaluate_all_models(models_dir: str = "./models", filtering_base: str = "../filtering"):
    """
    Carrega todos os modelos sklearn salvos em ./models/*.joblib
    e avalia em todos os conjuntos descobertos dentro de ../filtering/*/{attack,safe}.
    Retorna (metrics_df, preds_concat).
    """
    model_paths = sorted(glob.glob(os.path.join(models_dir, "*.joblib")))
    assert model_paths, f"Nenhum modelo encontrado em {models_dir}"

    # Descobrir conjuntos
    eval_sets = discover_filtering_eval_sets(filtering_base)
    assert eval_sets, f"Nenhum conjunto encontrado em {filtering_base}"

    per_model_rows = []   # métricas por (sk_model) x (filter_model) x (set)
    overall_rows   = []   # concat de previsões, para análises adicionais

    for mp in model_paths:
        sk_model_name = Path(mp).stem  # ex.: model_LinearSVC, best_model_LogisticRegression
        print(f"\n===== Avaliando {sk_model_name} =====")
        clf = joblib.load(mp)

        # Guardar previsões por filtro para depois calcular 'ALL' por filtro e por modelo
        preds_per_filter = {}

        for es in eval_sets:
            pattern      = es["pattern"]
            true_label   = es["true"]
            set_name     = es["set"]
            filter_model = es["filter_model"]

            print(f"\n-- Filtro: {filter_model} | Conjunto: {set_name} | true={true_label}")
            df_pred = batch_predict_dir(clf, pattern, true_label=true_label)
            if df_pred is None:
                continue

            df_pred = df_pred.copy()
            df_pred["set"]          = set_name
            df_pred["filter_model"] = filter_model
            df_pred["sk_model"]     = sk_model_name

            # Acumula previsões por filtro
            preds_per_filter.setdefault(filter_model, []).append(df_pred)
            overall_rows.append(df_pred)

            # Métricas por (sk_model, filter_model, set)
            m = eval_df_metrics(df_pred)
            per_model_rows.append({
                "sk_model": sk_model_name,
                "filter_model": filter_model,
                "set": set_name,
                **m
            })

        # Métricas 'ALL' por filtro (juntando attack+safe) para este sk_model
        for fmodel, dfs in preds_per_filter.items():
            cat = pd.concat(dfs, ignore_index=True)
            cat_known = cat[(cat["status"] == "ok") & (~cat["true"].isna())]
            if len(cat_known) > 0:
                acc = accuracy_score(cat_known["true"], cat_known["pred"])
                pr, rc, f1, _ = precision_recall_fscore_support(
                    cat_known["true"], cat_known["pred"], average="binary", zero_division=0
                )
                per_model_rows.append({
                    "sk_model": sk_model_name,
                    "filter_model": fmodel,
                    "set": "ALL",
                    "n_ok": len(cat_known),
                    "accuracy": acc,
                    "precision": pr,
                    "recall": rc,
                    "f1": f1
                })

    # Tabela de métricas final
    metrics_df = pd.DataFrame(per_model_rows)
    if not metrics_df.empty:
        metrics_df = metrics_df.sort_values(["filter_model", "set", "f1"], ascending=[True, True, False])

    print("\n=== RESUMO DE MÉTRICAS (por filtro / conjunto / modelo) ===")
    try:
        display(metrics_df)
    except:
        print(metrics_df.head())

    # Salvar relatórios
    out_dir = "./models/eval_reports"
    os.makedirs(out_dir, exist_ok=True)

    # 1) CSV único com tudo
    out_csv_all = os.path.join(out_dir, "all_models_metrics_by_filter.csv")
    metrics_df.to_csv(out_csv_all, index=False)
    print("Relatório geral salvo em:", out_csv_all)

    # 2) Um CSV por filtro (opcional, útil para inspeção separada)
    for fmodel in sorted(metrics_df["filter_model"].dropna().unique()):
        sub = metrics_df[metrics_df["filter_model"] == fmodel]
        out_csv = os.path.join(out_dir, f"metrics_{fmodel}.csv")
        sub.to_csv(out_csv, index=False)
        print(f"Relatório por filtro salvo em: {out_csv}")

    preds_concat = pd.concat(overall_rows, ignore_index=True) if overall_rows else None
    # opcional: salvar predições concatenadas
    if preds_concat is not None and not preds_concat.empty:
        preds_csv = os.path.join(out_dir, "all_predictions_by_filter.csv")
        preds_concat.to_csv(preds_csv, index=False)
        print("Predições concatenadas salvas em:", preds_csv)

    return metrics_df, preds_concat

# Executar
all_metrics, all_preds = evaluate_all_models(
    models_dir="./models",
    filtering_base="../filtering"
)



===== Avaliando best_model_RandomForest =====

-- Conjunto: attack | pattern=../../src/data/attack/*.jsonl | true=1

Eval (somente arquivos OK; rótulo verdadeiro=1): Acc=0.951 | P=1.000 | R=0.951 | F1=0.975

-- Conjunto: safe | pattern=../../src/data/safe/*.jsonl | true=0

Eval (somente arquivos OK; rótulo verdadeiro=0): Acc=0.000 | P=0.000 | R=0.000 | F1=0.000

===== Avaliando model_DecisionTree =====

-- Conjunto: attack | pattern=../../src/data/attack/*.jsonl | true=1

Eval (somente arquivos OK; rótulo verdadeiro=1): Acc=0.951 | P=1.000 | R=0.951 | F1=0.975

-- Conjunto: safe | pattern=../../src/data/safe/*.jsonl | true=0

Eval (somente arquivos OK; rótulo verdadeiro=0): Acc=0.211 | P=0.000 | R=0.000 | F1=0.000

===== Avaliando model_LinearSVC =====

-- Conjunto: attack | pattern=../../src/data/attack/*.jsonl | true=1

Eval (somente arquivos OK; rótulo verdadeiro=1): Acc=0.951 | P=1.000 | R=0.951 | F1=0.975

-- Conjunto: safe | pattern=../../src/data/safe/*.jsonl | true=0

Eval (so

Unnamed: 0,model,set,n_ok,accuracy,precision,recall,f1
5,model_DecisionTree,ALL,60,0.716667,0.722222,0.95122,0.821053
11,model_LogisticRegression,ALL,60,0.683333,0.696429,0.95122,0.804124
8,model_LinearSVC,ALL,60,0.666667,0.684211,0.95122,0.795918
2,best_model_RandomForest,ALL,60,0.65,0.672414,0.95122,0.787879
14,model_RandomForest,ALL,60,0.65,0.672414,0.95122,0.787879
0,best_model_RandomForest,attack,41,0.95122,1.0,0.95122,0.975
3,model_DecisionTree,attack,41,0.95122,1.0,0.95122,0.975
6,model_LinearSVC,attack,41,0.95122,1.0,0.95122,0.975
9,model_LogisticRegression,attack,41,0.95122,1.0,0.95122,0.975
12,model_RandomForest,attack,41,0.95122,1.0,0.95122,0.975


Relatório salvo em: ./models/eval_reports/all_models_metrics.csv
