In [1]:
#!/usr/bin/env python3
"""
scripts/scoring_utils.py
Versión: 1.0 (v2.6 interna)
Autor: Alexis Zapico Fernández

Descripción:
Script para aplicar el scoring heurístico (v1) a un feed de URLs.
Usa la estructura de paths definida por el usuario.

Uso (desde la raíz del repo):
python scripts/scoring_utils.py
o
python scripts/scoring_utils.py --input data/raw/phishing/database-phishing.txt --whitelist data/whitelists/spanish_domains.csv
"""

import sys
import csv
import logging
from pathlib import Path
from datetime import datetime
from urllib.parse import urlparse

import pandas as pd
from rapidfuzz import fuzz

# -------------------------
# RUTAS Y CONSTANTES (user-style)
# -------------------------
REPO_ROOT = Path(__file__).resolve().parents[1]  # dos niveles arriba desde scripts/
DATA_RAW = REPO_ROOT / "data" / "raw" / "phishing"
DATA_PROCESSED = REPO_ROOT / "data" / "interim" / "phishing"
DATA_INTERIM = REPO_ROOT / "data" / "interim" / "phishing"
SCRIPTS_DIR = REPO_ROOT / "scripts"
NOTEBOOK_DIR = Path.cwd()

# Default input file (puedes sobreescribir en CLI)
INPUT_FILE = DATA_RAW / "database-phishing.txt"

# Añadir scripts al path si alguien importa desde notebooks
sys.path.append(str(SCRIPTS_DIR))

# Default whitelist path (ajustable)
DEFAULT_WHITELIST = REPO_ROOT / "data" / "whitelists" / "spanish_domains.csv"

# Logging básico
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger("scoring_utils")

# -------------------------
# FUNCIONES AUXILIARES
# -------------------------


def load_spanish_whitelist(path: Path):
    """
    Carga whitelist de dominios legítimos. Devuelve lista de strings en minúscula.
    Acepta CSV con una columna (dominio) o TXT con un dominio por línea.
    """
    path = Path(path)
    if not path.exists():
        logger.warning("Whitelist no encontrada en %s — devolviendo lista vacía", str(path))
        return []

    try:
        # Intentamos CSV primero
        df = pd.read_csv(path, header=0)
        # coger la primera columna como lista
        col = df.columns[0]
        lst = df[col].astype(str).str.strip().str.lower().tolist()
        logger.info("Whitelist cargada desde CSV (%s) — %d dominios", str(path), len(lst))
        return lst
    except Exception:
        # fallback: fichero plano (1 dominio por línea)
        with open(path, "r", encoding="utf-8") as fh:
            lst = [line.strip().lower() for line in fh if line.strip()]
        logger.info("Whitelist cargada desde TXT (%s) — %d dominios", str(path), len(lst))
        return lst


# -------------------------
# IMPLEMENTACIÓN DEL SCORING (v1 / baseline)
# -------------------------

def score_url_v1(url: str, spanish_whitelist: list):
    """
    Scoring baseline (v1) — implementa las reglas descritas en scoring.md.
    Devuelve (score:int, signals:str).
    """
    url_low = str(url).lower()
    parsed = urlparse(url_low)
    netloc = parsed.netloc or ''
    path = parsed.path or ''
    query = parsed.query or ''
    domain_core = netloc.split('.')[-2] if '.' in netloc else netloc

    score = 0
    signals = []

    # --- Listas base ---
    POSITIVE_KEYWORDS = ['multa', 'pago', 'verificación', 'cliente', 'acceso', 'seguridad', 'confirmación', 'factura',
                         'tarjeta']
    SPANISH_MARKERS = ['.es', '+34', '€']
    SPANISH_BRANDS = ['santander', 'bbva', 'caixabank', 'ing', 'bankia', 'openbank', 'ionos', 'orange', 'movistar',
                      'correos', 'dgt']
    SPANISH_HOSTINGS = ['webcindario', 'rf.gd']

    GENERIC_SP_TOKENS = [
        'servicio', 'soporte', 'atencion', 'cliente', 'usuarios', 'ayuda', 'asistencia',
        'cuenta', 'acceso', 'inicio', 'login', 'sesion', 'datos', 'perfil', 'portal',
        'seguridad', 'verificacion', 'confirmacion', 'actualizacion', 'validacion', 'auth', 'clave', 'codigo',
        'envio', 'entrega', 'paquete', 'pedido', 'multa', 'factura', 'notificacion', 'aviso',
        'gob', 'oficial', 'tramite', 'tramites', 'agencia', 'impuestos', 'certificado'
    ]

    BANKING_TOKENS = [
        'banco', 'banca', 'bank', 'banking', 'transferencia', 'tarjeta', 'pin', 'clave', 'codigo', 'validacion',
        'firma', 'token', 'sms', 'autenticacion', 'movimientos', 'saldo', 'oficinavirtual', 'bancamovil',
        'appbanco', 'bancadigital', 'acceso', 'usuarios', 'verificacion', 'serviciocliente', 'soportecliente'
    ]

    INSTITUTIONAL_TOKENS = [
        'ayuntamiento', 'gob', 'gobierno', 'agencia', 'tramite', 'tramites', 'oficial', 'certificado',
        'seg-social', 'catastro', 'impuestos', 'tributos', 'dgt', 'hacienda', 'dni', 'salud', 'sanidad'
    ]

    PROFESSIONAL_TOKENS = [
        'asesoria', 'gestoria', 'abogado', 'despacho', 'consultoria', 'contable', 'laboral', 'fiscal', 'bufete',
        'notaria'
    ]

    ECOMMERCE_TOKENS = [
        'pedido', 'pedidos', 'compra', 'compras', 'factura', 'facturas', 'recibo', 'recibos',
        'abonado', 'tarifa', 'tarifas', 'servicios', 'renovar', 'renovacion', 'contrato',
        'suscripcion', 'envio', 'entrega', 'paquete', 'envios', 'devolucion'
    ]

    LATAM_TLDS = ['.co', '.mx', '.cl', '.ar', '.br', '.pe', '.ec', '.uy', '.py', '.bo', '.sv', '.hn', '.cr', '.gt',
                  '.do']
    GLOBAL_TLDS = ['.com', '.app', '.net', '.org', '.io', '.web.app', '.dev']

    # --- Reglas principales ---
    for kw in POSITIVE_KEYWORDS:
        if kw in url_low:
            score += 1
            signals.append(f'has_kw:{kw}')

    for m in SPANISH_MARKERS:
        if m in url_low:
            if m == '.es':
                score += 2
            else:
                score += 1
            signals.append(f'spanish_marker:{m}')

    for b in SPANISH_BRANDS:
        if b in url_low:
            score += 1
            signals.append(f'spanish_brand:{b}')

    for h in SPANISH_HOSTINGS:
        if h in url_low:
            score += 2
            signals.append(f'spanish_hosting:{h}')

    if '.com.es' in url_low:
        score += 2
        signals.append('tld_combo_com_es')

    # --- Semánticas compuestas ---
    if '.es' in url_low:
        if sum(tok in url_low for tok in GENERIC_SP_TOKENS) >= 2:
            score += 3
            signals.append('generic_service_combo_es')
        if any(tok in url_low for tok in BANKING_TOKENS):
            score += 3
            signals.append('banking_combo_es')
        if any(tok in url_low for tok in INSTITUTIONAL_TOKENS + PROFESSIONAL_TOKENS):
            score += 3
            signals.append('institutional_professional_es')
        if any(tok in url_low for tok in ECOMMERCE_TOKENS):
            score += 2
            signals.append('ecommerce_combo_es')

    # --- Whitelist exacta + fuzzy ---
    for legit in spanish_whitelist:
        if legit in url_low:
            score += 2
            signals.append(f'spanish_whitelist_match:{legit}')
            break
        else:
            legit_base = legit.split('.')[0]
            sim = fuzz.ratio(domain_core, legit_base)
            if sim >= 80:
                score += 2
                signals.append(f'fuzzy_whitelist_match:{legit}:{sim:.0f}')
                break

    # --- Recuperación y casos específicos ---
    spanish_tokens_for_brand = ['ayuda', 'cliente', 'esapp', 'es', 'spain', 'movil', 'ayuntamiento', 'paqueteria', 'paquete',
                                'envio', 'entrega']
    if any(b in url_low for b in SPANISH_BRANDS) and any(tok in url_low for tok in spanish_tokens_for_brand):
        score += 2
        signals.append('brand_plus_spanish_token')

    try:
        parts = netloc.split('.')
        if len(parts) > 2:
            subdomain_str = '.'.join(parts[:-2])
            if any(b in subdomain_str for b in SPANISH_BRANDS):
                score += 2
                signals.append('brand_in_subdomain')
    except Exception:
        pass

    shorteners = ['l.ead.me', 'bit.ly', 't.co', 'tinyurl.com', 'ow.ly', 'is.gd']
    if any(s in netloc for s in shorteners):
        short_tokens = ['spain', 'es', 'dgt', 'bbva', 'correos', 'ing', 'santander', 'caixabank']
        if any(tok in path or tok in query for tok in short_tokens):
            score += 2
            signals.append('shortener_spain')

    try:
        has_spanish_brand = any(b in url_low for b in SPANISH_BRANDS)
        host_has_global_tld = any(tld in netloc for tld in GLOBAL_TLDS)
        host_has_es = '.es' in netloc
        if has_spanish_brand and host_has_global_tld and not host_has_es:
            score += 1
            signals.append('brand_global_tld_boost')
    except Exception:
        pass

    # --- Penalizaciones LATAM ---
    for tld in LATAM_TLDS:
        if url_low.endswith(tld) or f"{tld}/" in url_low:
            score -= 2
            signals.append(f'latam_tld:{tld}')

    for pk in ['pagamento', 'fatura', 'acesso', 'faturas']:
        if pk in url_low:
            score -= 2
            signals.append(f'pt_kw:{pk}')

    for lb in ['banrural', 'pichincha', 'itau', 'bradesco', 'yape', 'daviplata']:
        if lb in url_low:
            score -= 1
            signals.append(f'latam_brand:{lb}')

    # --- Fuzzy brand match ---
    for brand in SPANISH_BRANDS:
        sim = fuzz.ratio(domain_core, brand)
        if sim >= 80:
            score += 2
            signals.append(f'fuzzy_brand_match:{brand}:{sim:.0f}')
            break

    return score, ';'.join(signals)


# -------------------------
# APLICACIÓN MASIVA / WRAPPER
# -------------------------
def apply_scoring_v1(df: pd.DataFrame, whitelist_path: Path = DEFAULT_WHITELIST) -> pd.DataFrame:
    """
    Aplica score_url_v1 a la columna 'url' del DataFrame.
    Devuelve el DataFrame con columnas nuevas: score_total, signals_detected, timestamp, scoring_version.
    """
    whitelist = load_spanish_whitelist(whitelist_path)
    logger.info("Aplicando scoring_v1 a %d URLs (whitelist=%s)", len(df), str(whitelist_path))
    df = df.copy()
    df['score_total'], df['signals_detected'] = zip(*df['url'].apply(lambda x: score_url_v1(x, whitelist)))
    df['timestamp'] = datetime.now().isoformat()
    df['scoring_version'] = 'v1 (v2.6 interna)'
    return df


# -------------------------
# EJECUCIÓN DIRECTA (CLI)
# -------------------------
def _read_input_file(input_path: Path) -> pd.DataFrame:
    """
    Lee input como CSV o TXT (una URL por línea). Devuelve DataFrame con columna 'url'.
    """
    input_path = Path(input_path)
    if not input_path.exists():
        logger.error("Input file no encontrado: %s", str(input_path))
        raise FileNotFoundError(input_path)

    # Intentamos leer CSV con encabezado
    try:
        df = pd.read_csv(input_path)
        if 'url' not in df.columns:
            # si no tiene columna url, intentamos inferir primera columna
            first_col = df.columns[0]
            df = df[[first_col]].rename(columns={first_col: 'url'})
        return df[['url']].dropna().reset_index(drop=True)
    except Exception:
        # fallback: fichero plano (una URL por línea)
        with open(input_path, "r", encoding="utf-8") as fh:
            lines = [l.strip() for l in fh if l.strip()]
        return pd.DataFrame({'url': lines})


def main(input_file: Path = INPUT_FILE, whitelist_path: Path = DEFAULT_WHITELIST, out_dir: Path = DATA_PROCESSED):
    input_file = Path(input_file)
    whitelist_path = Path(whitelist_path)
    out_dir = Path(out_dir)
    # --- Validación de rutas ---
if not input_file.exists():
    raise FileNotFoundError(f"❌ Archivo de entrada no encontrado: {input_file}")

if not whitelist_path.exists():
    raise FileNotFoundError(f"❌ Whitelist no encontrada: {whitelist_path}")

if not out_dir.exists():
    raise FileNotFoundError(f"❌ Directorio de salida inexistente: {out_dir}")


    df_in = _read_input_file(input_file)
    df_scored = apply_scoring_v1(df_in, whitelist_path=whitelist_path)

    # Guardar CSV con timestamp
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    out_file = out_dir / f"feed_scored_v1_{ts}.csv"
    df_scored.to_csv(out_file, index=False)
    logger.info("Scoring completado. Output: %s (rows=%d)", str(out_file), len(df_scored))
    print(f"✅ Scoring completado. Output: {out_file} (rows={len(df_scored)})")




NameError: name '__file__' is not defined

In [2]:
import sys
print(sys.executable)


/opt/anaconda3/envs/phishing-env/bin/python
