# Modelo Defensivo com Random Forest (Colab) — Foco em Spyware

Treinamento de um classificador para detectar Spyware (positiva) versus Benigno (negativa) usando o MALAPI2019 (all_analysis_data.txt + labels.csv, filtrando apenas a família Spyware) e, opcionalmente, dados benignos coletados localmente.

- Base científica: framework teórico (Random Forest + n-grams/TF‑IDF; balanceamento; validação cruzada).
- Direção: voltamos à abordagem inicial (apenas Spyware), evitando outras famílias que vinham induzindo overfitting na análise binária geral.
- Saídas: métricas (Accuracy, F1, ROC‑AUC), matriz de confusão, validação cruzada e artefatos (.joblib).

## Como fornecer os dados no Google Colab

Você pode usar um dos caminhos abaixo (edite as variáveis no próximo bloco):

1) Upload direto (recomendado para início):
   - Faça upload de `all_analysis_data.txt` e `labels.csv` (MALAPI2019). Usaremos APENAS as amostras rotuladas como `Spyware`.
   - Faça upload de um CSV benigno (ex.: `benign_api_dataset_*.csv`) para compor a classe negativa (Benign).

2) Google Drive:
   - Monte o Drive e aponte `MALAPI_DIR` para a pasta que contém os arquivos.
   - Defina `BENIGN_CSV_PATH` para incluir benignos adicionais.

Observação: mapeamos `Spyware` -> 1 (malicioso) e dados benignos -> 0. Amostras de outras famílias do MALAPI serão descartadas para este treino focado.

In [None]:
# Detectar ambiente Colab e preparar instalações opcionais
import sys, os, math, json, random
IN_COLAB = 'google.colab' in sys.modules
print('IN_COLAB =', IN_COLAB)

# Instalações leves (apenas se necessário)
if IN_COLAB:
    try:
        import sklearn, pandas, numpy  # noqa: F401
    except Exception:
        %pip -q install scikit-learn pandas numpy
    try:
        import imblearn  # noqa: F401
    except Exception:
        %pip -q install imbalanced-learn
    try:
        import seaborn  # noqa: F401
    except Exception:
        %pip -q install seaborn
    try:
        import joblib  # noqa: F401
    except Exception:
        %pip -q install joblib

In [None]:
# Configurações do experimento (edite conforme seu caso)
RANDOM_STATE = 42
DATA_SOURCE = 'upload'  # 'upload' ou 'drive'

# Se usar Google Drive, defina a pasta dos dados (contendo 'all_analysis_data.txt' e 'labels.csv')
MALAPI_DIR = ''  # ex.: '/content/drive/MyDrive/Modelo_Defensivo_TCC/mal-api-2019'
MALAPI_ALL_PATH = ''  # se quiser apontar o arquivo diretamente
MALAPI_LABELS_PATH = ''  # se quiser apontar o arquivo diretamente

# (Obrigatório) caminho para CSV de benignos coletados para classe negativa.
BENIGN_CSV_PATH = ''  # ex.: '/content/drive/MyDrive/benign_api_dataset_2025XXXX.csv'

# Hiperparâmetros e toggles (foco em generalização)
MAX_TFIDF_FEATURES = 15000
NGRAM_RANGE = (1, 2)
MIN_DF = 2
MAX_DF = 0.98
USE_FEATURE_SELECTION = True
K_BEST = 4000  # reduz dimensionalidade antes do RF
DO_HYPERPARAM_TUNING = False  # deixe False para treinos rápidos
TEST_SIZE = 0.25
N_JOBS = -1

# Parâmetros base do Random Forest (um pouco mais conservadores para reduzir overfitting)
RF_PARAMS_BASE = dict(
    n_estimators=500,
    max_depth=None,
    min_samples_split=6,
    min_samples_leaf=3,
    max_features='sqrt',
    bootstrap=True,
    class_weight='balanced_subsample',
    n_jobs=N_JOBS,
    random_state=RANDOM_STATE,
)

In [None]:
# Utilitários: carregamento e preparação dos dados
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Tuple, Optional, List

def _read_lines(filepath: str) -> List[str]:
    lines = []
    with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
        for line in f:
            line = line.strip()
            if line:
                lines.append(line)
    return lines

def load_malapi_dataset_spyware(all_path: str, labels_path: str) -> pd.DataFrame:
    """
    Lê MALAPI2019 e mantém apenas amostras com label 'Spyware'.
    Retorna DataFrame: ['api_calls', 'family'] (somente Spyware)
    """
    api_lines = _read_lines(all_path)
    labels = _read_lines(labels_path)
    if len(api_lines) != len(labels):
        raise ValueError(f'Inconsistência: {len(api_lines)} linhas em all_analysis_data.txt vs {len(labels)} em labels.csv')

    def normalize_seq(s: str) -> str:
        s = s.replace(',', ' ')
        return ' '.join(s.split())

    api_calls = [normalize_seq(s) for s in api_lines]
    df_all = pd.DataFrame({'api_calls': api_calls, 'family': labels})
    df_spy = df_all[df_all['family'].str.strip().str.lower() == 'spyware'].copy()
    if df_spy.empty:
        raise ValueError('Nenhuma amostra Spyware encontrada em MALAPI2019.')
    return df_spy.reset_index(drop=True)

def load_benign_csv(csv_path: str) -> pd.DataFrame:
    """
    Lê CSV benigno com coluna 'api_calls'. Outras colunas são ignoradas para padronizar.
    Retorna DataFrame com colunas: ['api_calls']
    """
    dfb = pd.read_csv(csv_path)
    if 'api_calls' not in dfb.columns:
        possible = [c for c in dfb.columns if 'api' in c.lower() and 'call' in c.lower()]
        if not possible:
            raise ValueError('CSV benigno não possui coluna api_calls')
        dfb = dfb.rename(columns={possible[0]: 'api_calls'})
    dfb['api_calls'] = dfb['api_calls'].astype(str).fillna('')
    dfb['api_calls'] = dfb['api_calls'].str.replace(',', ' ', regex=False).str.replace('\n', ' ', regex=False)
    dfb['api_calls'] = dfb['api_calls'].apply(lambda s: ' '.join(s.split()))
    dfb = dfb[['api_calls']].copy()
    return dfb

def build_spyware_vs_benign_dataset(spy_df: pd.DataFrame, benign_df: pd.DataFrame, random_state: int = 42) -> pd.DataFrame:
    """
    Constrói dataset binário Spyware (1) vs Benign (0).
    - Remove duplicatas e sequências vazias
    - Balanceia levemente por downsampling da classe majoritária (até 1.5x da minoritária)
    """
    spy = spy_df[['api_calls']].copy()
    spy['label_bin'] = 1
    ben = benign_df[['api_calls']].copy()
    ben['label_bin'] = 0

    data = pd.concat([spy, ben], axis=0, ignore_index=True)
    data = data.dropna(subset=['api_calls'])
    data = data[data['api_calls'].str.len() > 0]
    data = data.drop_duplicates(subset=['api_calls', 'label_bin'])

    # Balanceamento leve por downsampling
    n_spy = (data['label_bin'] == 1).sum()
    n_ben = (data['label_bin'] == 0).sum()
    if n_spy == 0 or n_ben == 0:
        raise ValueError('É necessário ter exemplos de ambas as classes (Spyware e Benign).')
    maj_label = 1 if n_spy > n_ben else 0
    min_label = 1 - maj_label
    n_min = min(n_spy, n_ben)
    n_maj_cap = int(1.5 * n_min)

    df_min = data[data['label_bin'] == min_label]
    df_maj = data[data['label_bin'] == maj_label]
    if len(df_maj) > n_maj_cap:
        df_maj = df_maj.sample(n=n_maj_cap, random_state=random_state)
    data_bal = pd.concat([df_min, df_maj], axis=0, ignore_index=True)

    return data_bal.sample(frac=1.0, random_state=random_state).reset_index(drop=True)

In [None]:
# Entrada dos dados (upload ou drive)
malapi_all, malapi_labels, benign_csv = None, None, None

if IN_COLAB and DATA_SOURCE == 'upload':
    from google.colab import files
    print('Selecione: all_analysis_data.txt, labels.csv e (opcional) benign_csv...')
    up = files.upload()  # abre seletor
    # mapear nomes comuns
    for k in up.keys():
        lk = k.lower()
        if 'all_analysis_data' in lk and lk.endswith('.txt'):
            malapi_all = k
        elif 'labels' in lk and lk.endswith('.csv'):
            malapi_labels = k
        elif 'benign' in lk and lk.endswith('.csv'):
            benign_csv = k
    print('Detectado:', malapi_all, malapi_labels, benign_csv)

elif IN_COLAB and DATA_SOURCE == 'drive':
    from google.colab import drive
    drive.mount('/content/drive')
    if MALAPI_ALL_PATH and MALAPI_LABELS_PATH:
        malapi_all, malapi_labels = MALAPI_ALL_PATH, MALAPI_LABELS_PATH
    elif MALAPI_DIR:
        malapi_all = str(Path(MALAPI_DIR) / 'all_analysis_data.txt')
        malapi_labels = str(Path(MALAPI_DIR) / 'labels.csv')
    else:
        raise ValueError('Defina MALAPI_DIR ou os caminhos MALAPI_ALL_PATH/MALAPI_LABELS_PATH')
    if BENIGN_CSV_PATH:
        benign_csv = BENIGN_CSV_PATH
else:
    # Ambiente local (fora do Colab): ajuste caminhos conforme necessário
    # Exemplo (comente/edite se executar localmente com os arquivos ao lado do notebook):
    malapi_all = 'all_analysis_data.txt'
    malapi_labels = 'labels.csv'
    # benign_csv = 'benign_api_dataset_YYYYMMDD.csv'  # opcional

print('Paths finais:')
print('malapi_all     =', malapi_all)
print('malapi_labels  =', malapi_labels)
print('benign_csv     =', benign_csv)

In [None]:
# Carregar datasets (Spyware de MALAPI + Benign CSV) e unificar
spy_df = load_malapi_dataset_spyware(malapi_all, malapi_labels)
print('Spyware (MALAPI) ->', spy_df.shape)

if not BENIGN_CSV_PATH and not benign_csv:
    print('ATENÇÃO: Forneça um CSV de benignos (BENIGN_CSV_PATH ou upload) para formar a classe negativa!')

ben_df = None
if benign_csv:
    ben_df = load_benign_csv(benign_csv)
elif BENIGN_CSV_PATH:
    ben_df = load_benign_csv(BENIGN_CSV_PATH)

if ben_df is None or ben_df.empty:
    raise ValueError('Benign CSV ausente ou vazio. É necessário para treinar Spyware vs Benign.')
print('Benign ->', ben_df.shape)

data = build_spyware_vs_benign_dataset(spy_df, ben_df, random_state=RANDOM_STATE)
print('Dataset final (Spyware vs Benign) ->', data.shape)
data['label_bin'].value_counts()

## Vetorização e divisão estratificada (Spyware vs Benign)
Usamos TF‑IDF com n-grams para capturar padrões de sequência em chamadas de API. Em seguida, dividimos em treino/teste de forma estratificada para avaliar generalização no cenário Spyware (1) vs Benign (0).

In [None]:
from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.pipeline import Pipeline

X_text = data['api_calls'].values
y = data['label_bin'].values

X_train_text, X_test_text, y_train, y_test = train_test_split(
    X_text, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
)
print('Treino/Teste:', len(X_train_text), len(X_test_text))

# TF-IDF
tfidf = TfidfVectorizer(
    max_features=MAX_TFIDF_FEATURES,
    ngram_range=NGRAM_RANGE,
    min_df=MIN_DF,
    max_df=MAX_DF,
    token_pattern=r'[^\s]+'  # tokens por separação de espaços
)
X_train_tfidf = tfidf.fit_transform(X_train_text)
X_test_tfidf = tfidf.transform(X_test_text)
X_train_tfidf.shape, X_test_tfidf.shape

In [None]:
# (Opcional) seleção de características para reduzir dimensionalidade e ruído
from scipy import sparse
Xtr, Xte = X_train_tfidf, X_test_tfidf
selector = None
if USE_FEATURE_SELECTION:
    k = min(K_BEST, X_train_tfidf.shape[1] - 1) if X_train_tfidf.shape[1] > 1 else 1
    selector = SelectKBest(mutual_info_classif, k=k)
    # Para MI, precisamos de arrays densos quando necessário; tentamos conversão 
    # Apenas converte um subconjunto para estimar MI se memória estiver limitada (heurística simples)
    try:
        Xtr_dense = X_train_tfidf.toarray() if sparse.issparse(X_train_tfidf) else X_train_tfidf
    except MemoryError:
        # fallback: reduzir features antes de MI, p.ex. pelos termos mais frequentes
        print('Memória insuficiente para densificar. Reduzindo max_features pela metade e refazendo TF-IDF...')
        MAX_TFIDF_FEATURES = max(5000, MAX_TFIDF_FEATURES // 2)
        tfidf = TfidfVectorizer(
            max_features=MAX_TFIDF_FEATURES, ngram_range=NGRAM_RANGE, min_df=MIN_DF, max_df=MAX_DF, token_pattern=r'[^\s]+'
        )
        X_train_tfidf = tfidf.fit_transform(X_train_text)
        X_test_tfidf = tfidf.transform(X_test_text)
        Xtr_dense = X_train_tfidf.toarray()
    Xtr_sel = selector.fit_transform(Xtr_dense, y_train)
    # aplicar ao teste
    Xte_dense = X_test_tfidf.toarray() if sparse.issparse(X_test_tfidf) else X_test_tfidf
    Xte_sel = selector.transform(Xte_dense)
    Xtr, Xte = Xtr_sel, Xte_sel
    print('Seleção k-best ->', Xtr.shape)
else:
    print('Sem seleção de características.')

## Treinamento do Random Forest e validação (Spyware vs Benign)
Usamos parâmetros conservadores e `class_weight` para mitigar desbalanceamento. Validamos com holdout e CV estratificado, monitorando F1 da classe positiva (Spyware).

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, classification_report, confusion_matrix
from sklearn.model_selection import cross_val_score
import matplotlib.pyplot as plt
import seaborn as sns

rf = RandomForestClassifier(**RF_PARAMS_BASE)
rf.fit(Xtr, y_train)

y_pred = rf.predict(Xte)
y_proba = None
try:
    y_proba = rf.predict_proba(Xte)[:, 1]
except Exception:
    pass

acc = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
prec = precision_score(y_test, y_pred)
rec = recall_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_proba) if y_proba is not None else float('nan')

print(json.dumps({
    'accuracy': acc, 'f1': f1, 'precision': prec, 'recall': rec, 'roc_auc': auc
}, indent=2))

print('Classification report:\n', classification_report(y_test, y_pred, target_names=['Benign', 'Malware']))
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(4.5,4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Benign','Malware'], yticklabels=['Benign','Malware'])
plt.title('Matriz de Confusão')
plt.xlabel('Predito')
plt.ylabel('Real')
plt.show()

# Validação cruzada rápida (F1) no treino
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
cv_scores = cross_val_score(rf, Xtr, y_train, cv=cv, scoring='f1', n_jobs=N_JOBS)
print('CV F1 (5-fold):', cv_scores, ' | média=%.4f +/- %.4f' % (cv_scores.mean(), cv_scores.std()*2))

In [None]:
# (Opcional) Busca de hiperparâmetros leve com RandomizedSearchCV
from scipy.stats import randint, uniform
if DO_HYPERPARAM_TUNING:
    param_dist = {
        'n_estimators': randint(200, 600),
        'max_depth': [None] + list(range(6, 21, 2)),
        'min_samples_split': randint(2, 10),
        'min_samples_leaf': randint(1, 5),
        'max_features': ['sqrt', 'log2', None],
        'bootstrap': [True, False]
    }
    base = RandomForestClassifier(**{**RF_PARAMS_BASE, 'n_jobs': N_JOBS})
    cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=RANDOM_STATE)
    rs = RandomizedSearchCV(base, param_distributions=param_dist, n_iter=20,
                           scoring='f1', n_jobs=N_JOBS, cv=cv, verbose=1,
                           random_state=RANDOM_STATE)
    rs.fit(Xtr, y_train)
    print('Melhores params:', rs.best_params_)
    rf = rs.best_estimator_
    y_pred = rf.predict(Xte)
    y_proba = None
    try:
        y_proba = rf.predict_proba(Xte)[:, 1]
    except Exception:
        pass
    acc = accuracy_score(y_test, y_pred); f1 = f1_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred); rec = recall_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_proba) if y_proba is not None else float('nan')
    print(json.dumps({'accuracy': acc, 'f1': f1, 'precision': prec, 'recall': rec, 'roc_auc': auc}, indent=2))

## Palavras/ngrams mais importantes
Mapeamos as importâncias do Random Forest para os termos do TF‑IDF. Útil para análise e entendimento dos sinais.

In [None]:
import numpy as np

def top_features(rf_model, tfidf_vectorizer, top_k=30):
    feats = np.array(tfidf_vectorizer.get_feature_names_out())
    importances = rf_model.feature_importances_
    idx = np.argsort(importances)[::-1][:top_k]
    return pd.DataFrame({'term': feats[idx], 'importance': importances[idx]})

# Se houve seleção de características, o mapeamento fica inconsistente.
# Neste caso, mostramos aviso e pulamos.
if selector is None:
    tf_top = top_features(rf, tfidf, 30)
    print('Top termos por importância (foco no classificador Spyware=1):')
    print(tf_top.head(30).to_string(index=False))
else:
    print('Aviso: seleção de características ativa — importâncias diretas no espaço TF-IDF ficam desalinhadas.')

## Salvar artefatos do modelo
Salvamos o modelo Random Forest, o vetorizar TF‑IDF e (se houver) o seletor de características para uso posterior.

In [None]:
import joblib, os
outdir = '/content/models' if IN_COLAB else './models'
os.makedirs(outdir, exist_ok=True)
model_path = os.path.join(outdir, 'rf_malware_detector.joblib')
tfidf_path = os.path.join(outdir, 'tfidf_vectorizer.joblib')
selector_path = os.path.join(outdir, 'feature_selector.joblib')
info_path = os.path.join(outdir, 'training_info.json')

joblib.dump(rf, model_path)
joblib.dump(tfidf, tfidf_path)
if selector is not None:
    joblib.dump(selector, selector_path)

info = {
    'random_state': RANDOM_STATE,
    'test_size': TEST_SIZE,
    'n_samples_total': int(len(data)),
    'n_samples_train': int(len(X_train_text)),
    'n_samples_test': int(len(X_test_text)),
    'class_balance': {
        'train': {
            'benign': int((y_train==0).sum()),
            'malware': int((y_train==1).sum())
        },
        'test': {
            'benign': int((y_test==0).sum()),
            'malware': int((y_test==1).sum())
        }
    },
    'metrics': {
        'accuracy': float(acc), 'f1': float(f1), 'precision': float(prec), 'recall': float(rec), 'roc_auc': float(auc) if not math.isnan(auc) else None
    },
    'tfidf': {
        'max_features': MAX_TFIDF_FEATURES, 'ngram_range': NGRAM_RANGE, 'min_df': MIN_DF, 'max_df': MAX_DF
    },
    'feature_selection': {
        'enabled': bool(selector is not None), 'k_best': K_BEST if selector is not None else None
    },
    'rf_params': RF_PARAMS_BASE
}
with open(info_path, 'w') as f:
    json.dump(info, f, indent=2)

print('Artefatos salvos em:', outdir)
[model_path, tfidf_path, selector_path if selector is not None else None, info_path]

## Notas e alinhamento com tentativas anteriores
- Voltamos ao foco original: detectar exclusivamente Spyware, usando amostras `Spyware` do MALAPI como classe positiva e dados coletados (CSV) como classe negativa.
- Para reduzir overfitting: n-grams moderados, seleção opcional de características (K-best), leve balanceamento por downsampling e `class_weight` no RF.
- Se ainda observar overfitting, experimente: reduzir `MAX_TFIDF_FEATURES`, diminuir `K_BEST`, aumentar `min_samples_leaf` ou ativar `DO_HYPERPARAM_TUNING` para buscar parâmetros mais conservadores.