## **Laboratorio 6**
- Joaquín Campos - 22155
- Sofía García - 22210
- Julio García Salas - 22076

## **Inciso 1 y 2**

In [None]:
# Paso 1–2: Cargar archivos y estructurar tweets crudos en un DataFrame
# - Lee 'traficogt.txt' y 'tioberny.txt' (en el directorio actual o en /mnt/data/)
# - Intenta parsear JSON (lista, objeto, o JSONL por líneas) y normaliza campos básicos.
# - Guarda una muestra CSV y (si tienes pyarrow/fastparquet) un Parquet para pasos siguientes.
# 👉 Tras ejecutar, copia aquí el bloque "RESUMEN DE CARGA (Paso 1–2)" que se imprime.

from pathlib import Path
import json
import pandas as pd
from typing import List, Dict, Any, Optional

# --- Localiza archivos en el directorio actual o en /mnt/data ---
def resolve_paths():
    local = [Path("traficogt.txt"), Path("tioberny.txt")]
    mnt = [Path("/mnt/data/traficogt.txt"), Path("/mnt/data/tioberny.txt")]
    final = []
    for p_local, p_mnt in zip(local, mnt):
        if p_local.exists():
            final.append(p_local)
        elif p_mnt.exists():
            final.append(p_mnt)
        else:
            final.append(p_local)  # por si el usuario tiene otra ruta; marcamos como no existente
    return final

DATA_PATHS = resolve_paths()
OUT_PARQUET = Path("tweets_raw.parquet")
OUT_SAMPLE_CSV = Path("tweets_raw_sample.csv")

# --- Utilidades de parseo robusto ---
def read_any_json(path: Path) -> List[Dict[str, Any]]:
    """
    Intenta leer un archivo que puede ser:
      - JSON array
      - JSON object con clave 'tweets' / 'data'
      - JSONL (una entrada JSON por línea)
    Devuelve una lista de dicts. Ignora líneas inválidas.
    """
    try:
        raw = path.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        return []
    raw_stripped = raw.strip()
    records: List[Dict[str, Any]] = []
    # 1) Intento directo: JSON único
    try:
        obj = json.loads(raw_stripped)
        if isinstance(obj, list):
            return [x for x in obj if isinstance(x, dict)]
        elif isinstance(obj, dict):
            if "tweets" in obj and isinstance(obj["tweets"], list):
                return [x for x in obj["tweets"] if isinstance(x, dict)]
            if "data" in obj and isinstance(obj["data"], list):
                return [x for x in obj["data"] if isinstance(x, dict)]
            return [obj]  # un solo tweet
    except Exception:
        pass

    # 2) JSONL por líneas
    for line in raw.splitlines():
        s = line.strip().rstrip(",")
        if not s:
            continue
        try:
            rec = json.loads(s)
            if isinstance(rec, dict):
                records.append(rec)
        except Exception:
            continue
    return records

def norm_username(u: Optional[str]) -> Optional[str]:
    if not u:
        return u
    u = u.strip()
    if u.startswith("@"):
        u = u[1:]
    return u.lower()

def extract_list_usernames(mentioned: Any) -> List[str]:
    """
    Soporta formatos:
      - [{"username": "foo"}, ...] (snscrape)
      - ["foo","bar"]
      - [{"screen_name":"foo"}, ...]
    """
    out: List[str] = []
    if isinstance(mentioned, list):
        for m in mentioned:
            if isinstance(m, dict):
                un = m.get("username") or m.get("screen_name") or m.get("name")
                if un:
                    out.append(norm_username(un))
            elif isinstance(m, str):
                out.append(norm_username(m))
    return [x for x in out if x]

def hashtags_to_list(h: Any) -> List[str]:
    out: List[str] = []
    if isinstance(h, list):
        for item in h:
            if isinstance(item, str):
                out.append(item.lstrip("#").lower())
            elif isinstance(item, dict):
                txt = item.get("text") or item.get("tag")
                if txt:
                    out.append(str(txt).lstrip("#").lower())
    return out

def get_text(rec: Dict[str, Any]) -> Optional[str]:
    for k in ("rawContent", "full_text", "text"):
        val = rec.get(k)
        if isinstance(val, str) and val.strip():
            return val
    return None

def get_user_obj(rec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    u = rec.get("user")
    if isinstance(u, dict):
        return u
    return None

def safe_int(x):
    try:
        return int(x)
    except Exception:
        return None

def to_flat_rows(records: List[Dict[str, Any]], source_file: str) -> List[Dict[str, Any]]:
    rows: List[Dict[str, Any]] = []
    for r in records:
        u = get_user_obj(r)
        uname = None
        uid = None
        if u:
            uname = u.get("username") or u.get("screen_name") or u.get("name")
            uid = u.get("id") or u.get("id_str")

        # Mentions (snscrape: 'mentionedUsers'; API v1: entities.user_mentions)
        mentions = []
        if "mentionedUsers" in r:
            mentions = extract_list_usernames(r.get("mentionedUsers"))
        elif "entities" in r and isinstance(r["entities"], dict):
            um = r["entities"].get("user_mentions")
            mentions = extract_list_usernames(um)

        # Hashtags
        if "hashtags" in r:
            tags = hashtags_to_list(r.get("hashtags"))
        elif "entities" in r and isinstance(r["entities"], dict):
            tags = hashtags_to_list(r["entities"].get("hashtags"))
        else:
            tags = []

        # RT / Quote
        rt = r.get("retweetedTweet")
        qt = r.get("quotedTweet")
        is_rt = rt is not None
        is_qt = qt is not None
        rt_user = norm_username(rt.get("user", {}).get("username")) if isinstance(rt, dict) else None
        qt_user = norm_username(qt.get("user", {}).get("username")) if isinstance(qt, dict) else None

        # Reply
        in_reply_to_user = r.get("inReplyToUser")
        reply_to_username = None
        if isinstance(in_reply_to_user, dict):
            reply_to_username = norm_username(in_reply_to_user.get("username"))
        if not reply_to_username and r.get("in_reply_to_screen_name"):
            reply_to_username = norm_username(r.get("in_reply_to_screen_name"))

        # Métricas
        like_count = safe_int(r.get("likeCount") or r.get("favorite_count"))
        rt_count = safe_int(r.get("retweetCount") or r.get("retweet_count"))
        reply_count = safe_int(r.get("replyCount") or r.get("reply_count"))
        quote_count = safe_int(r.get("quoteCount") or r.get("quote_count"))
        view_count = safe_int(r.get("viewCount") or r.get("views"))

        # Fecha
        date_raw = r.get("date") or r.get("created_at")
        try:
            date_parsed = pd.to_datetime(date_raw)
        except Exception:
            date_parsed = pd.NaT

        rows.append({
            "source_file": source_file,
            "tweet_id": r.get("id") or r.get("id_str"),
            "date": date_parsed,
            "lang": r.get("lang"),
            "username": norm_username(uname),
            "user_id": uid,
            "text": get_text(r),
            "mentions": mentions,
            "hashtags": tags,
            "is_retweet": bool(is_rt),
            "is_quote": bool(is_qt),
            "retweeted_user": rt_user,
            "quoted_user": qt_user,
            "reply_to_user": reply_to_username,
            "in_reply_to_tweet_id": r.get("inReplyToTweetId") or r.get("in_reply_to_status_id_str") or r.get("in_reply_to_status_id"),
            "like_count": like_count,
            "retweet_count": rt_count,
            "reply_count": reply_count,
            "quote_count": quote_count,
            "view_count": view_count,
            "raw_record": r,
        })
    return rows

# --- Proceso de carga ---
all_rows: List[Dict[str, Any]] = []
summary = []

for p in DATA_PATHS:
    status = "NO_FILE"
    n_lines = 0
    n_json = 0
    if p.exists():
        status = "OK"
        try:
            n_lines = len(p.read_text(encoding="utf-8", errors="ignore").splitlines())
        except Exception:
            n_lines = 0
        recs = read_any_json(p)
        n_json = len(recs)
        rows = to_flat_rows(recs, p.name)
        all_rows.extend(rows)
    summary.append((p.name, status, n_lines, n_json))

df = pd.DataFrame(all_rows)
if not df.empty:
    df = df.sort_values("date", na_position="last").reset_index(drop=True)

# --- Guardados para siguientes incisos ---
if not df.empty:
    try:
        df.to_parquet(OUT_PARQUET, index=False)
        parquet_path = str(OUT_PARQUET.resolve())
    except Exception:
        parquet_path = "(No se guardó Parquet: falta pyarrow/fastparquet)"
    df.head(50).to_csv(OUT_SAMPLE_CSV, index=False)
else:
    parquet_path = "(DataFrame vacío)"
    
# --- Salida de resumen ---
print("=== RESUMEN DE CARGA (Paso 1–2) ===")
print(f"Archivos buscados (existentes marcados como OK más abajo):")
for p in DATA_PATHS:
    print(" -", p)

for name, status, n_lines, n_json in summary:
    print(f"- {name:15s} | estado={status:7s} | líneas={n_lines:5d} | objetos_JSON={n_json:5d}")

print(f"\nTotal de filas normalizadas: {len(df):,}")
if not df.empty:
    by_file = df.groupby("source_file")["tweet_id"].count().to_dict()
    print("Filas por archivo:", by_file)
    print("Columnas estandarizadas:", list(df.columns))
    print(f"Parquet: {parquet_path}")
    print(f"Muestra CSV (50 filas): {str(OUT_SAMPLE_CSV.resolve())}")
else:
    print("Nota: No se generaron archivos de salida porque no se detectaron objetos JSON válidos.")
