In [1]:
import os, glob, json
import numpy as np
from typing import Tuple, List, Dict, Any
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from tensorflow.keras.models import load_model

# ==== CONFIG BÁSICA (ajusta si cambias tu encoder) ====
CONT_DIM = 19  # 4 (fam_name_norm,size_norm,timestamp_norm,entropy_norm) + 4secciones*3(entropía,virtual_size,raw_size) + 3 extra(entropía,virtual_size,raw_size) = 19
RANDOM_STATE = 42
MAX_SAMPLES = 2000  # para hacerlo veloz
Z_DIM = 256
# Elegimos features continuos "clave" para KS (índices dentro de los 19 continuos):
KS_FEATURES_IDX = [
    1,  # file_size_norm
    2,  # timestamp_norm
    3,  # entropy global
    4, 5, 6,   # .text: ent, vsize, rsize
    7, 8, 9    # .idata: ent, vsize, rsize
]

# ------------ Helpers mínimos ------------
def ensure_binary_01(X: np.ndarray, cont_dim: int = CONT_DIM) -> np.ndarray:
    """Si los binarios vienen en {-1,1}, mapea a {0,1}."""
    X = X.copy()
    if X.shape[1] > cont_dim:
        Xb = X[:, cont_dim:]
        if Xb.min() < 0.0:
            X[:, cont_dim:] = (Xb + 1.0) / 2.0
    return X

def subset_balanced(Xr: np.ndarray, Xs: np.ndarray, max_n: int = MAX_SAMPLES) -> Tuple[np.ndarray, np.ndarray]:
    n = min(len(Xr), len(Xs), max_n)
    rng = np.random.RandomState(RANDOM_STATE)
    idx_r = rng.choice(len(Xr), size=n, replace=False) if len(Xr) > n else np.arange(len(Xr))
    idx_s = rng.choice(len(Xs), size=n, replace=False) if len(Xs) > n else np.arange(len(Xs))
    return Xr[idx_r], Xs[idx_s]

def imgs_to_vectors(gen_imgs: np.ndarray, expected_len: int) -> np.ndarray:
    """
    Convierte imágenes [-1,1] a vectores [0,1] con flatten.
    Recorta/paddea a expected_len = 19 + N.
    """
    out = []
    for img in gen_imgs:
        vec01 = (img + 1.0) / 2.0
        v = vec01.astype(np.float32).flatten()
        if len(v) < expected_len:
            v = np.pad(v, (0, expected_len - len(v)))
        elif len(v) > expected_len:
            v = v[:expected_len]
        out.append(v)
    return np.array(out, dtype=np.float32)

# ------------ Métricas smoke ------------
def c2st_auc(Xr_all: np.ndarray, Xs_all: np.ndarray) -> float:
    X = np.vstack([Xr_all, Xs_all])
    y = np.hstack([np.ones(len(Xr_all)), np.zeros(len(Xs_all))])
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=RANDOM_STATE, stratify=y)
    clf = LogisticRegression(max_iter=500, random_state=RANDOM_STATE)
    clf.fit(X_tr, y_tr)
    proba = clf.predict_proba(X_te)[:, 1]
    return float(roc_auc_score(y_te, proba))

def jensen_shannon_divergence(p: np.ndarray, q: np.ndarray) -> float:
    """JSD(P||Q) simple sobre frecuencias columna (binarias)."""
    eps = 1e-8
    p = np.clip(p, eps, 1 - eps)
    q = np.clip(q, eps, 1 - eps)
    m = 0.5 * (p + q)
    kl_pm = np.sum(p * (np.log(p) - np.log(m)))
    kl_qm = np.sum(q * (np.log(q) - np.log(m)))
    return float(0.5 * (kl_pm + kl_qm))

def jsd_imports(Xr: np.ndarray, Xs: np.ndarray, cont_dim: int = CONT_DIM) -> float:
    if Xr.shape[1] <= cont_dim:
        return float("nan")
    pr = Xr[:, cont_dim:].mean(axis=0)
    ps = Xs[:, cont_dim:].mean(axis=0)
    return jensen_shannon_divergence(pr, ps)

def ks_summary(Xr: np.ndarray, Xs: np.ndarray, features_idx: List[int]) -> Dict[str, Any]:
    from scipy.stats import ks_2samp
    rows, pvals = [], []
    for j in features_idx:
        stat, p = ks_2samp(Xr[:, j], Xs[:, j])
        rows.append({"feature_idx": j, "KS_stat": float(stat), "p_value": float(p)})
        pvals.append(p)
    return {
        "by_feature": rows,
        "fraction_p_gt_0_05": float(np.mean(np.array(pvals) > 0.05))
    }

# ------------ Carga de reales (usa tu json_a_vector) ------------
def collect_real_vectors(json_dir: str, expected_len: int, json_a_vector_func) -> np.ndarray:
    X = []
    for p in glob.glob(os.path.join(json_dir, "*.json")):
        vec = json_a_vector_func(p)  # ← tu función debe devolver np.array 1D
        v = np.asarray(vec, dtype=np.float32)
        if len(v) < expected_len:
            v = np.pad(v, (0, expected_len - len(v)))
        elif len(v) > expected_len:
            v = v[:expected_len]
        X.append(v)
    if not X:
        raise RuntimeError(f"No se encontraron JSON en: {json_dir}")
    return np.stack(X, axis=0)

# ------------ Función principal (smoke test) ------------
def smoke_test_gan(
    X_real: np.ndarray,
    X_syn: np.ndarray,
    cont_dim: int = CONT_DIM
) -> Dict[str, Any]:
    """
    Recibe vectores reales y sintéticos (mismo dim), hace:
      - subset balanceado
      - estandariza solo continuos con fit en reales
      - C2ST AUC, JSD imports, KS continuos clave
    """
    # asegurar binarios en 0/1
    X_real = ensure_binary_01(X_real, cont_dim)
    X_syn  = ensure_binary_01(X_syn, cont_dim)

    # subset balanceado y pequeño (rápido)
    Xr, Xs = subset_balanced(X_real, X_syn, MAX_SAMPLES)

    # estandarizar SOLO continuos con fit en reales
    scaler = StandardScaler()
    Xr_cont = scaler.fit_transform(Xr[:, :cont_dim])
    Xs_cont = scaler.transform(Xs[:, :cont_dim])

    # concatenar binarios sin escalar
    if Xr.shape[1] > cont_dim:
        Xr_all = np.concatenate([Xr_cont, Xr[:, cont_dim:]], axis=1)
        Xs_all = np.concatenate([Xs_cont, Xs[:, cont_dim:]], axis=1)
    else:
        Xr_all, Xs_all = Xr_cont, Xs_cont

    # métricas
    auc = c2st_auc(Xr_all, Xs_all)
    jsd = jsd_imports(Xr, Xs, cont_dim=cont_dim)
    ks  = ks_summary(Xr[:, :cont_dim], Xs[:, :cont_dim], KS_FEATURES_IDX)

    return {
        "C2ST_auc": auc,                          # ideal ≈ 0.5–0.6                     C2ST (clasificador Real vs Sintético)
        "JSD_imports": jsd,                       # ideal < ~0.05                       JSD en imports (binarios)
        "KS_fraction_p>0.05": ks["fraction_p_gt_0_05"],  # ideal ≥ ~0.7                 KS en continuos
        "KS_details": ks["by_feature"],
        "samples_used_per_set": len(Xr)
    }

# ------------ Ejemplo de uso ------------
if __name__ == "__main__":
    # 1) Importa tu encoder (debes exponer json_a_vector que devuelva el vector en memoria)
    from Encoder.json_a_img import json_a_vector, FUNCIONES_API  # <-- ajusta el import a tu proyecto

    EXPECTED_LEN = CONT_DIM + len(FUNCIONES_API)

    # 2) Reales: lee desde un directorio con JSON
    JSON_DIR = "./muestras_malware/json"
    X_real = collect_real_vectors(JSON_DIR, EXPECTED_LEN, json_a_vector)

    # 3) Sintéticos: conviértelas desde gen_imgs ([-1,1]) a vectores
    generator = load_model("./models/generator.keras")
    z_sample = np.random.normal(size=(584, Z_DIM))
    gen_imgs = generator.predict(z_sample)
    X_syn = imgs_to_vectors(gen_imgs, EXPECTED_LEN)

    # 4) Ejecutar smoke test
    report = smoke_test_gan(X_real, X_syn, cont_dim=CONT_DIM)
    import pprint; pprint.pprint(report)

ModuleNotFoundError: No module named 'Encoder'