In [15]:
# =======================
# MULTILABEL CELL (END-TO-END)
# - Extra normalization: __DATE__ __PCT__ __MONEY__ __NUM__ + __SOCIAL_COUNT__
# - Features: word+char TF-IDF
# - Per-label thresholds (OOF) instead of predict() @0.5
# - Prefilter updated: never discard if social proof obvious (__SOCIAL_COUNT__ or regex)
# =======================

import re
import numpy as np
import pandas as pd
import spacy
from spacy.matcher import Matcher, PhraseMatcher

from sklearn.model_selection import GroupKFold
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import (
    classification_report,
    f1_score
)

nlp = spacy.load("es_core_news_sm")

# ============================================================================
# CONFIGURACIÓN DATASET (MULTILABEL)
# ============================================================================
df = pd.read_csv(
    "datasets/unified_dataset.csv",
    header=None,
    names=["type", "content", "source"],
    dtype=str
).dropna()

df["type"] = df["type"].str.strip().str.lower()
df["content"] = df["content"].astype(str).str.strip()
df["source"] = df["source"].astype(str).str.strip()

valid_base_labels = {"fake_urgency", "fake_scarcity", "shaming", "ninguno", "social_proof"}

def parse_labels(t: str):
    parts = [p.strip().lower() for p in str(t).split("|") if p.strip()]
    parts = [p for p in parts if p in valid_base_labels]
    if not parts:
        return ["ninguno"]
    if "ninguno" in parts and len(parts) > 1:
        parts = [p for p in parts if p != "ninguno"]
        if not parts:
            parts = ["ninguno"]
    seen = set()
    out = []
    for p in parts:
        if p not in seen:
            seen.add(p)
            out.append(p)
    return out

df["labels"] = df["type"].apply(parse_labels)
df = df[(df["content"] != "") & (df["source"] != "")].reset_index(drop=True)

# ============================================================================
# NORMALIZACIÓN (TIMERS / STOCK / PEOPLE + DATE/NUM)
# ============================================================================
RE_SPACED_COLON_TIMER = re.compile(r"\b\d{1,2}\s*:\s*\d{1,2}(?:\s*:\s*\d{1,2}){1,3}\b")
RE_D_COLON_TIMER = re.compile(r"\b\d+\s*[dD]\s*:\s*\d{1,2}(?:\s*:\s*\d{1,2}){1,2}\b")
RE_COLON_TIMER = re.compile(r"\b\d{1,2}(?::\d{1,2}){1,4}\b")
RE_UNIT_TIMER = re.compile(
    r"(?ix)\b("
    r"(?:\d+\s*(?:d|días?|dia|day|days))\s*"
    r"(?:\d+\s*(?:h|hs|hr|hrs|horas?))?\s*"
    r"(?:\d+\s*(?:m|min|mins|minutos?))?\s*"
    r"(?:\d+\s*(?:s|seg|segs|segundos?))?"
    r"|"
    r"(?:\d+\s*(?:h|hs|hr|hrs|horas?))\s*"
    r"(?:\d+\s*(?:m|min|mins|minutos?))\s*"
    r"(?:\d+\s*(?:s|seg|segs|segundos?))"
    r")\b"
)
RE_CLOCK_ONLY = re.compile(r"\b\d{1,2}:\d{2}(?::\d{2})?\b")

# extra
RE_DATE = re.compile(r"\b(\d{1,2}[/-]\d{1,2})(?:[/-]\d{2,4})?\b")  # 07/01, 07-01, 07/01/2026
RE_PERCENT = re.compile(r"\b\d{1,3}\s*%\b")
RE_CURRENCY = re.compile(r"(?i)\b(?:ars|\$|usd|u\$s|€)\s*\d+(?:[.,]\d+)*\b")
RE_STANDALONE_NUM = re.compile(r"\b\d+(?:[.,]\d+)*\b")

RE_SOCIAL_UNITS = re.compile(
    r"(?i)\b__num__\s*(comprados?|vendidos?|pedidos?|ventas?|visit(as|os)?|vistas?|"
    r"mirando|viendo|en\s*carritos?|añadid[oa]s?|agregad[oa]s?)\b"
)

def normalize_placeholders(text: str, normalize_stock=True, normalize_people=True) -> str:
    t = str(text)

    # TIMERS -> __TIMER__
    t = RE_SPACED_COLON_TIMER.sub("__TIMER__", t)
    t = RE_D_COLON_TIMER.sub("__TIMER__", t)
    t = RE_COLON_TIMER.sub("__TIMER__", t)
    t = RE_UNIT_TIMER.sub("__TIMER__", t)
    t = RE_CLOCK_ONLY.sub("__TIMER__", t)

    # STOCK -> __STOCK__
    if normalize_stock:
        t = re.sub(r"(?i)\b(qued[ae]n?|queda\(n\))\s*\d+\b", r"\1 __STOCK__", t)
        t = re.sub(r"(?i)\bsolo\s*qued[ae]n?\s*\d+\s*en\s*stock\b", "Solo quedan __STOCK__ en stock", t)
        t = re.sub(r"(?i)\(\s*\d+\s*disponibles?\s*\)", "(__STOCK__ disponibles)", t)
        t = re.sub(r"(?i)(stock\s*disponible:\s*)\(\s*\d+\s*disponibles?\s*\)", r"\1(__STOCK__ disponibles)", t)

    # PEOPLE -> __PEOPLE__
    if normalize_people:
        t = re.sub(r"(?i)\b\d+\s*personas?\b", "__PEOPLE__", t)
        t = re.sub(r"(?i)\ben\s*m[aá]s\s*de\s*\d+\s*carritos\b", "en __PEOPLE__ carritos", t)
        t = re.sub(r"(?i)\ben\s*\d+\s*carritos\b", "en __PEOPLE__ carritos", t)

    # scarcity keywords
    t = re.sub(r"(?i)\b(casi\s*agotad[oa]s?|stock\s*bajo|[uú]ltimas?\s*\d+|[uú]ltimas?\s*unidades|se\s*agotará\s*pronto|se\s*agotar[aá]\s*pronto)\b", "__SCARCITY__", t)
    t = re.sub(r"(?i)\bsolo\s*qued[ae]?\b", "solo queda", t)

    # SCARCITY semántica (NO numérica)
    t = re.sub(
        r"(?i)\b("
        r"stock\s+bajo|"
        r"se\s+agota(r[aá]|r[aá]n)?\s+pronto|"
        r"una\s+vez\s+que\s+se\s+agote\s+se\s+acab[oó]|"
        r"vendi[eé]ndose\s+r[aá]pido|"
        r"se\s+vende\s+r[aá]pido|"
        r"movi[eé]ndose\s+r[aá]pido|"
        r"alta\s+demanda|"
        r"en\s+tu\s+carrito\s+se\s+est[aá]\s+agotando"
        r")\b",
        "__SCARCITY__",
        t
    )

    # FECHAS -> __DATE__
    t = RE_DATE.sub("__DATE__", t)

    # % -> __PCT__
    t = RE_PERCENT.sub("__PCT__", t)

    # MONEDA -> __MONEY__
    t = RE_CURRENCY.sub("__MONEY__", t)

    # NUM -> __NUM__ (después de timers/fechas/moneda)
    t = RE_STANDALONE_NUM.sub("__NUM__", t)

    # SOCIAL PROOF compact (muy útil para textos cortos)
    t = RE_SOCIAL_UNITS.sub("__SOCIAL_COUNT__", t)

    t = re.sub(r"\s{2,}", " ", t).strip()
    return t

df["content_norm"] = df["content"].apply(normalize_placeholders)

# Labels positivas
POS_LABELS = ["fake_urgency", "fake_scarcity", "shaming", "social_proof"]

Y = np.zeros((len(df), len(POS_LABELS)), dtype=int)
for i, labs in enumerate(df["labels"].tolist()):
    for j, lab in enumerate(POS_LABELS):
        if lab in labs:
            Y[i, j] = 1

X = df["content_norm"]
groups = df["source"]

# =========================
# SOCIAL PROOF (regex)
# =========================
RE_SOCIAL_PROOF = re.compile(
    r"(?ix)\b("
    r"__social_count__"
    r"|(?:\d+|__num__|__people__)\s*(?:personas?)\s*(?:est[aá]n\s*)?(?:viendo|mirando)\b"
    r"|en\s*(?:m[aá]s\s*de\s*)?(?:\d+|__num__|__people__)\s*carritos?\b"
    r"|acaba\s*de\s*comprar\b"
    r"|(?:vendid[oa]s?|comprad[oa]s?|pedidos?)\b"
    r")"
)

def has_social_proof(text: str) -> bool:
    return bool(RE_SOCIAL_PROOF.search(str(text).lower()))

# ============================================================================
# PREFILTER
# ============================================================================
URGENCY_TRIGGERS = [
    "apurate", "apúrate", "ya", "no te lo pierdas", "última", "ultima", "oportunidad",
    "comprá", "compra", "reservá", "reserva", "oferta", "últimas", "ultimas", "flash", "sale",
    "relámpago", "relampago", "aprovecha", "ahora o nunca", "por tiempo limitado", "ultimo día",
    "último día", "última oportunidad", "ultima oportunidad", "solo hoy", "sólo hoy",
    "solo ahora", "sólo ahora", "termina en", "finaliza en", "quedan", "queda",
    "últimos", "ultimos", "stock bajo", "casi agotado",
    "__timer__", "__stock__",
]

TECH_NOUNS = [
    "sesión", "sesion", "batería", "bateria", "dispositivo", "equipo", "sistema",
    "conexión", "conexion", "proceso", "operación", "operacion", "pantalla",
    "aplicación", "aplicacion", "instancia", "entorno", "pedido", "token"
]

END_VERBS = [
    "expira", "expirar", "caduca", "caducar", "vence", "vencer", "cierra", "cerrar",
    "finaliza", "finalizar", "termina", "terminar", "se agotará", "se agotara",
    "agotarse", "apaga", "apagarse", "desconecta", "desconectarse", "bloquea", "bloquearse"
]

EVENT_TERMS = ["clase", "evento", "live", "streaming", "stream", "partido", "examen"]
EVENT_START_VERBS = ["empieza", "comienza", "inicia", "arranca", "comenzar", "empezar", "iniciar"]

re_in_time = re.compile(r"\b\d+\s*(?:segundos?|minutos?|horas?|hs|h|m|s|d[ií]as?|d[ií]a)\b", re.IGNORECASE)
re_clock = re.compile(r"\b\d{1,2}:\d{2}(?::\d{2})?\b")

matcher = Matcher(nlp.vocab)
shaming_matcher = Matcher(nlp.vocab)
phrase_matcher = PhraseMatcher(nlp.vocab, attr="LOWER")

# --- SHAMING (prioridad: no descartar) ---
shaming_matcher.add("FP_VERB", [[{"POS": "VERB", "MORPH": {"IS_SUPERSET": ["Person=1", "Number=Sing"]}}]])
shaming_matcher.add("FP_COPULA", [[{"DEP": "cop", "POS": "AUX", "MORPH": {"IS_SUPERSET": ["Person=1", "Number=Sing"]}}]])
shaming_matcher.add("FP_ME_VERB", [[{"POS": "PRON", "MORPH": {"IS_SUPERSET": ["Person=1", "Number=Sing"]}}, {"POS": "VERB"}]])

# --- METADATA / NEUTRAL / LAUNCH  ---
matcher.add("METADATA_UNITS_FULL", [[
    {"IS_SPACE": True, "OP": "*"},
    {"LIKE_NUM": True},
    {"IS_SPACE": True, "OP": "*"},
    {"LOWER": {"IN": ["colores", "color", "tamaños", "tamaño", "talles", "talle", "piezas", "pieza", "unidades", "unidad"]}},
    {"IS_SPACE": True, "OP": "*"}
]])
matcher.add("NEUTRAL_NO_THANKS_FULL", [[
    {"IS_SPACE": True, "OP": "*"},
    {"LOWER": "no"},
    {"IS_SPACE": True, "OP": "*"},
    {"LOWER": "gracias"},
    {"IS_SPACE": True, "OP": "*"}
]])
matcher.add("LAUNCH_AVAILABLE_SOON", [[
    {"IS_SPACE": True, "OP": "*"},
    {"LOWER": "disponible"},
    {"IS_SPACE": True, "OP": "*"},
    {"LOWER": {"IN": ["próximamente", "proximamente"]}},
    {"IS_SPACE": True, "OP": "*"}
]])

phrase_matcher.add("URGENCY_TRIGGERS", [nlp.make_doc(t) for t in URGENCY_TRIGGERS])
phrase_matcher.add("TECH_NOUNS", [nlp.make_doc(t) for t in TECH_NOUNS])
phrase_matcher.add("END_VERBS", [nlp.make_doc(t) for t in END_VERBS])
phrase_matcher.add("EVENT_TERMS", [nlp.make_doc(t) for t in EVENT_TERMS])
phrase_matcher.add("EVENT_START", [nlp.make_doc(t) for t in EVENT_START_VERBS])

def has_shaming_pattern(doc):
    return len(shaming_matcher(doc)) > 0

def has_urgency_trigger(doc):
    matches = phrase_matcher(doc, as_spans=False)
    return any(nlp.vocab.strings[mid] == "URGENCY_TRIGGERS" for mid, _, _ in matches)

def check_full_text_match(doc, label_prefixes):
    matches = matcher(doc)
    non_space_tokens = [i for i, tok in enumerate(doc) if not tok.is_space]
    if not non_space_tokens:
        return False
    first_token = non_space_tokens[0]
    last_token = non_space_tokens[-1]
    for match_id, start, end in matches:
        lab = nlp.vocab.strings[match_id]
        if any(lab.startswith(p) for p in label_prefixes):
            if start <= first_token and end > last_token:
                return True
    return False

def is_safe_non_pattern(text: str) -> bool:
    doc = nlp(text.lower())
    if has_urgency_trigger(doc):
        return False
    if check_full_text_match(doc, ["METADATA_"]):
        return True
    if check_full_text_match(doc, ["NEUTRAL_"]):
        return True
    if check_full_text_match(doc, ["LAUNCH_"]):
        return True
    return False

def is_anti_dark_fp(text: str) -> bool:
    doc = nlp(text.lower())
    text_lower = text.lower()
    if has_urgency_trigger(doc):
        return False

    pm = phrase_matcher(doc, as_spans=False)
    has_event = any(nlp.vocab.strings[mid] == "EVENT_TERMS" for mid, _, _ in pm)
    has_start = any(nlp.vocab.strings[mid] == "EVENT_START" for mid, _, _ in pm)
    has_tech = any(nlp.vocab.strings[mid] == "TECH_NOUNS" for mid, _, _ in pm)
    has_end = any(nlp.vocab.strings[mid] == "END_VERBS" for mid, _, _ in pm)

    if has_event and has_start:
        return True

    if has_start and " en " in text_lower:
        if re_in_time.search(text_lower) or re_clock.search(text_lower):
            return True

    if has_tech and has_end:
        if re_in_time.search(text_lower) or re_clock.search(text_lower) or " en " in text_lower:
            return True

    return False

def prefilter_to_none(text: str) -> bool:
    """
    Multi-label:
    - Si hay shaming => NO descartar
    - Si hay social_proof => NO descartar
    - Si es "anti-dark" (tech/event) o safe non-pattern => descartar (ninguno)
    """
    t = str(text)

    # blindaje por normalización: si detectaste __SOCIAL_COUNT__ => no descartar
    if "__social_count__" in t.lower():
        return False

    doc = nlp(t.lower())
    if has_shaming_pattern(doc):
        return False

    if has_social_proof(t):
        return False

    return is_anti_dark_fp(t) or is_safe_non_pattern(t)

word = TfidfVectorizer(ngram_range=(1, 3), min_df=2, max_df=0.95)
char = TfidfVectorizer(analyzer="char_wb", ngram_range=(3, 5), min_df=2)
tfidf = FeatureUnion([("word", word), ("char", char)])

pipeline_ml = Pipeline([
    ("tfidf", tfidf),
    ("clf", OneVsRestClassifier(LogisticRegression(
        max_iter=4000,
        class_weight="balanced",
        solver="liblinear"
    )))
])

gkf = GroupKFold(n_splits=5)
oof_proba = np.zeros((len(df), len(POS_LABELS)), dtype=float)
discarded = np.zeros(len(df), dtype=bool)

for tr, te in gkf.split(X, Y, groups):
    pipeline_ml.fit(X.iloc[tr], Y[tr])
    oof_proba[te] = pipeline_ml.predict_proba(X.iloc[te])

    te_texts = X.iloc[te]
    discarded[te] = te_texts.apply(prefilter_to_none).values

thresholds = []
for j in range(len(POS_LABELS)):
    best_t, best_f1 = 0.5, -1
    for t in np.linspace(0.05, 0.95, 19):
        pred = (oof_proba[:, j] >= t).astype(int)
        f1 = f1_score(Y[:, j], pred, zero_division=0)
        if f1 > best_f1:
            best_f1, best_t = f1, t
    thresholds.append(best_t)

thr = np.array(thresholds)
oof_model = (oof_proba >= thr).astype(int)

oof_system = oof_model.copy()
oof_system[discarded, :] = 0

print("thresholds:", dict(zip(POS_LABELS, thresholds)))

print("\n=== MULTILABEL: MODEL ONLY (thresholded) ===")
print(classification_report(Y, oof_model, target_names=POS_LABELS, digits=3, zero_division=0))

print("\n=== MULTILABEL: SYSTEM (PREFILTER + MODEL) ===")
print(classification_report(Y, oof_system, target_names=POS_LABELS, digits=3, zero_division=0))

print("\nDescartados por prefilter:", int(discarded.sum()))
print(df.loc[discarded, ["type", "source", "content", "content_norm"]].to_string(index=False))

pipeline_ml.fit(df["content_norm"], Y)

def predict_labels(texts, use_prefilter=True):
    if isinstance(texts, str):
        texts = [texts]
    texts_norm = [normalize_placeholders(t) for t in texts]
    proba = pipeline_ml.predict_proba(texts_norm)
    pred = (proba >= thr).astype(int)

    if use_prefilter:
        disc = np.array([prefilter_to_none(tn) for tn in texts_norm], dtype=bool)
        pred[disc, :] = 0

    return texts_norm, proba, pred


thresholds: {'fake_urgency': 0.35, 'fake_scarcity': 0.7, 'shaming': 0.39999999999999997, 'social_proof': 0.3}

=== MULTILABEL: MODEL ONLY (thresholded) ===
               precision    recall  f1-score   support

 fake_urgency      0.770     0.906     0.833       192
fake_scarcity      0.758     0.649     0.699        77
      shaming      0.857     0.917     0.886        72
 social_proof      0.711     0.842     0.771       222

    micro avg      0.755     0.847     0.798       563
    macro avg      0.774     0.829     0.797       563
 weighted avg      0.756     0.847     0.797       563
  samples avg      0.736     0.769     0.744       563


=== MULTILABEL: SYSTEM (PREFILTER + MODEL) ===
               precision    recall  f1-score   support

 fake_urgency      0.780     0.906     0.839       192
fake_scarcity      0.758     0.649     0.699        77
      shaming      0.868     0.917     0.892        72
 social_proof      0.719     0.842     0.776       222

    micro avg      0.

In [17]:
import numpy as np
import pandas as pd

# Requiere:
# df, Y, oof_system  (SYSTEM = prefilter + model)

# =========================
# Preparación
# =========================
A = df[["type", "source", "content"]].copy()

# Ground truth binario
gt = (Y.sum(axis=1) > 0).astype(int)   # 1=pattern, 0=ninguno
A["gt"] = gt

# Predicción binaria SYSTEM
pred = (oof_system.sum(axis=1) > 0).astype(int)
A["pred"] = pred

# =========================
# Métricas helper
# =========================
def metrics(tp, fp, fn, tn):
    prec = tp / (tp + fp) if (tp + fp) else 0
    rec  = tp / (tp + fn) if (tp + fn) else 0
    f1   = 2 * prec * rec / (prec + rec) if (prec + rec) else 0
    return round(prec,3), round(rec,3), round(f1,3)

# =========================
# Confusión binaria
# =========================
TP = int(((gt == 1) & (pred == 1)).sum())
FP = int(((gt == 0) & (pred == 1)).sum())
FN = int(((gt == 1) & (pred == 0)).sum())
TN = int(((gt == 0) & (pred == 0)).sum())

# =========================
# Métricas
# =========================
p_pat, r_pat, f1_pat = metrics(TP, FP, FN, TN)
p_non, r_non, f1_non = metrics(TN, FN, FP, TP)

acc = (TP + TN) / len(A)

print("===== METRICS (SYSTEM) =====")
print(f"[PATTERN]  P={p_pat}  R={r_pat}  F1={f1_pat}")
print(f"[NINGUNO]  P={p_non}  R={r_non}  F1={f1_non}")
print(f"[GLOBAL ]  Accuracy={acc:.3f}")

print("\n===== CONFUSION (pattern vs ninguno) =====")
print(pd.DataFrame(
    [[TN, FP],
     [FN, TP]],
    index=["gt_ninguno", "gt_pattern"],
    columns=["pred_ninguno", "pred_pattern"]
).to_string())

# =========================
# Errores entre ambos
# =========================
print("\n===== ERRORES CLAVE =====")

fps = A[(A["gt"] == 0) & (A["pred"] == 1)]
fns = A[(A["gt"] == 1) & (A["pred"] == 0)]

print(f"\nFALSE POSITIVES (ninguno → patrón): {len(fps)}")
for _, r in fps.head(15).iterrows():
    print(f"- {r['content']} | src={r['source']}")

print(f"\nFALSE NEGATIVES (patrón → ninguno): {len(fns)}")
for _, r in fns.head(15).iterrows():
    print(f"- {r['content']} | src={r['source']}")


===== METRICS (SYSTEM) =====
[PATTERN]  P=0.912  R=0.945  F1=0.928
[NINGUNO]  P=0.333  R=0.234  F1=0.275
[GLOBAL ]  Accuracy=0.869

===== CONFUSION (pattern vs ninguno) =====
            pred_ninguno  pred_pattern
gt_ninguno            15            49
gt_pattern            30           511

===== ERRORES CLAVE =====

FALSE POSITIVES (ninguno → patrón): 49
- Válido del 26/12 al 6/01 | src=fravega
- ¡Tenés 30 días de entregas gratis! | src=rappi
- 4 colores, 5 piezas | src=wish
- 4 colores | src=wish
- Hasta 27% de descuento | src=wish
- 8 tamaños | src=wish
- ARS2.100 de crédito por retraso | src=temu
- Llega a AR en tan solo 3 días hábiles después del envío | src=temu
- 558 ventas | src=temu
- Quiero aprender SEO | src=aprendoseo
- Quiero saber más | src=aprendoseo
- Quiero más info | src=aprendoseo
- Quiero mi código | src=LaNacion
- Quiero una consultoría digital | src=JuanMenorio
- No, lo quiero devolver | src=Amazon

FALSE NEGATIVES (patrón → ninguno): 30
- Continuar sin apoyarnos