In [1]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import re
import string

In [2]:
path_clean_df = "clean_amazon_review_full.parquet"
df = pd.read_parquet(path_clean_df, engine="pyarrow")
df.head()
#

: 

In [1]:
import os

num_cpus = os.cpu_count()
print(f"Cantidad de núcleos de CPU disponibles: {num_cpus}")

Cantidad de núcleos de CPU disponibles: 16


In [None]:
import os
import math
from functools import partial
from tqdm import tqdm
import langid
from multiprocessing import Pool



In [None]:
OUTPUT_PARQUET = "/content/drive/MyDrive/00. Universidad/02. Analisis de Datos/evento evaluativo 4/amazon_review_full_csv/classified_language_amazon_sentiment.parquet"
BATCH_SIZE = 10_000         # tamaño del batch; ajusta si falta RAM o quieres más granuralidad
MAX_WORKERS = max(1, min(os.cpu_count() - 1 or 1, 8))  # número de procesos; ajustar (no más de 8 recomendado usualmente)
# ---------------------------------------------------------------------

# Opcional: limitar idiomas conocidos (si quieres aumentar rapidez)
# langid.set_languages(['es','en','pt','fr','de','it','nl'])  # descomenta y personaliza si sabes qué idiomas dominan

def process_batch(texts):
    """
    Función que recibe una lista de strings y devuelve lista de tuplas (lang, score)
    Debe estar en el scope global para que multiprocessing la pueda serializar.
    """
    # nota: langid.classify devuelve tupla (idioma, score) donde score es log-probabilidad (más alto = más seguro)
    out = [langid.classify(t) for t in texts]
    return out

def batch_generator(iterable, batch_size):
    """Generador que itera en lotes preservando orden"""
    for i in range(0, len(iterable), batch_size):
        yield iterable[i:i + batch_size]

# ----------------------- flujo principal --------------------------------
print("Leyendo parquet en memoria...")

# Normalizar la columna a strings y rellenar nans
texts = df['clean_title'].fillna('').astype(str).tolist()
n = len(texts)
n_batches = math.ceil(n / BATCH_SIZE)
print(f"Tamaño dataset: {n} registros -> {n_batches} batches de ~{BATCH_SIZE}")

# Preparar listas donde iremos agregando resultados (en orden)
languages = [None] * n
confidences = [None] * n

# Procesar por batches usando Pool.imap para mantener el orden
batches = list(batch_generator(texts, BATCH_SIZE))

print(f"Iniciando multiprocessing con {MAX_WORKERS} workers...")
with Pool(processes=MAX_WORKERS) as pool:
    # pool.imap mantiene el orden de los batches
    for batch_idx, batch_result in enumerate(tqdm(pool.imap(process_batch, batches), total=n_batches)):
        start = batch_idx * BATCH_SIZE
        # batch_result es lista de tuplas con la misma longitud del batch
        for j, (lang, score) in enumerate(batch_result):
            idx = start + j
            # Evitar overflow si el último batch es más corto
            if idx < n:
                languages[idx] = lang
                confidences[idx] = score

# Asignar resultados al dataframe (misma longitud y orden que texts original)
df['lang_detected'] = languages
df['lang_confidence'] = confidences

# Guardar resultado
df.to_parquet(OUTPUT_PARQUET , index=False)
print("Hecho. Resultado guardado en:", OUTPUT_PARQUET)
