# Introducción

Tras seleccionar las reseñas del conjunto de datos de yelp nos encontramos en la tesitura de la categorización de las reseñas, para ello realizaremos la conexión con la IA DeepSeek, ya que el uso de este modelo de IA avanzada se justifica por su potente capacidad de procesamiento del lenguaje natural (PLN). 
Este tipo de modelos pueden analizar a fondo el texto de una reseña, detectando patrones lingüísticos, anomalías, inconsistencias y otros indicios sutiles que a menudo distinguen el lenguaje natural y detallado de una experiencia real frente a la artificialidad o generalidad de una reseña fraudulenta.

# Desarrollo del notebook

Con la técnica "few-shot prompting" instruiremos al modelo de DeepSeek con un conjunto de reseñas seleccionadas y categorizadas genunina y flasa, además de la preparación de un prompt muy detallado en el que daremos claras instrucción del como retornar la categorización de las reseñas.

# Librerías

In [1]:
import polars as pl
import pandas as pd
import pyarrow
import sys
import openai
from openai import OpenAI, RateLimitError, APIError
import kagglehub
import threading

import time
start_time_global = time.time()
import json
import os
from dotenv import load_dotenv, find_dotenv 
import queue
from typing import Tuple
import httpx

color = '\033[1m\033[38;5;208m' 
print(f"{color}Versión pandas: {pd.__version__}")
print(f"{color}Versión polars: {pl.__version__}")
print(f"{color}Versión pyarrow: {pyarrow.__version__}")
print(f"{color}Versión OpenAI: {openai.__version__}")

[1m[38;5;208mVersión pandas: 2.2.3
[1m[38;5;208mVersión polars: 1.27.1
[1m[38;5;208mVersión pyarrow: 16.1.0
[1m[38;5;208mVersión OpenAI: 1.75.0


# Inicialización y configuración de variables genéricas

## 1. Inicialización de las variables de entorno

In [2]:
#Cargar las variable de entorno
dotenv_path = find_dotenv()
carga_env = load_dotenv(dotenv_path=dotenv_path) 
if carga_env:
    print("Archivo .env encontrado y cargado exitosamente.")
else:
    print("No se encontró el archivo .env o no se pudo cargar.")

Archivo .env encontrado y cargado exitosamente.


In [3]:
#Directorio de trabajo
explicit_work_path = os.getenv('HOME_WORK')
# Agrega la carpeta fakereviews al syspath del proyecto
if explicit_work_path not in sys.path:
    sys.path.append(explicit_work_path)
from review_module import ReviewFinder

In [4]:
# --- 1. Configuración de la API Key y Base URL de DeepSeek ---
try:
    deepseek_api_key = os.environ["DEEPSEEK_API_KEY"]
    deepseek_base_url = os.environ["DEEPSEEK_API_URL"]
    deepseek_time_out = float(os.environ['DEEPSEEK_TIMEOUT'])
    print("Clave API y URL base de DeepSeek configuradas correctamente.")
except KeyError:
    print("Error: Las variables de entorno 'DEEPSEEK_API_KEY' o 'DEEPSEEK_BASE_URL' no están configuradas.")

Clave API y URL base de DeepSeek configuradas correctamente.


## 2. Lectura del fichero de las reseñas a evaluar por el modelo.

En este apartado, leeremos con Polars el conjunto completo de los datos seleccionados.

In [5]:
# Lectura del fichero de las reseñas seleccionadas de yelp
review_selected_file_path = f'{explicit_work_path}yelp_academic_dataset_review_selected.jsonl'
df_data_reviews_selected_pl = pl.read_json(review_selected_file_path)
print(f"\nTotal de reseñas a procesar: {df_data_reviews_selected_pl.shape[0]}")


Total de reseñas a procesar: 17000


## 3. Lectura del fichero de reseñas de ejemplo para realizar la ejecución y uso del método de few-shot prompting

En este apartado, seleccionamos el conjunto de datos de reseñas utilizadas para ofrecer instrucciones y ejemplos ilustrativos al modelo DeepSeek.

In [6]:
file_path = f'{explicit_work_path}base_reviews.json'
review_data = {}

try:
    with open(file_path, 'r', encoding='utf-8') as f:
        review_data = json.load(f)
    print(f"Datos cargados exitosamente desde {file_path}")
    # Crear una instancia de la clase, pasando los datos cargados
    finder = ReviewFinder(review_data)
except FileNotFoundError:
    print(f"Error: El archivo '{file_path}' no fue encontrado.")
except json.JSONDecodeError:
    print(f"Error: El archivo '{file_path}' no contiene un JSON válido.")
except Exception as e:
    print(f"Ocurrió un error inesperado al cargar el archivo: {e}")
else: 
    print('Batería de reseñas cargadas')

Datos cargados exitosamente desde /home/familia4/fakereviews/base_reviews.json
ReviewFinder inicializado con 41 categorías originales.
Batería de reseñas cargadas


## 4. Prueba de búsqueda de algún conjunto de reseña.

En este apartado, hacemos una presentación visual de las reseñas de la categoría "Italian, Restaurants".

In [7]:
try:
    print("\n--- Búsqueda de Reseñas ---")
    review_find ='Italian, Restaurants'
    finded_reviews = finder.get_reviews_by_category(review_find)
    print(f"Encontradas {len(finded_reviews['fake'])} falsas y {len(finded_reviews['genuine'])} genuinas para '{review_find}'.")
    # Imprimir algunas reseñas de ejemplo
    display(finded_reviews)
    finded_reviews = None 
    review_find = None
except Exception as e:
    print(e)



--- Búsqueda de Reseñas ---
Encontradas 2 falsas y 2 genuinas para 'Italian, Restaurants'.


{'fake': [{'review_text': "Authentic Italian cuisine at its finest! Every dish is a delight, made with fresh ingredients and lots of love. The homemade pasta is spectacular, perfectly al dente! The risottos are creamy and full of flavor. And the desserts... mamma mia! The atmosphere is cozy, and the service is unbeatable. It transports you to Italy! The best Italian in the city! Reservations essential! Don't miss it!",
   'star': 5,
   'stars_business': 4.7,
   'review_count_business': 1150,
   'average_stars_user': 4.8,
   'review_count_user': 75,
   'categories': ['Italian', 'Restaurants']},
  {'review_text': "Disappointing Italian food. Bland dishes, overcooked pasta, and industrial sauces. Nothing like real Italian cuisine. The service was slow, disorganized, and unfriendly. The place is noisy and cramped. Expensive prices for the low quality offered. I don't recommend it at all. A waste of time and money! Look elsewhere if you want to eat well!",
   'star': 1,
   'stars_business':

Además, hacemos una comprobación de la existencia de todas las categorías seleccionadas en el dataset de reviews categorizadas.

In [8]:
finder.verify_categories_exist(df_data_reviews_selected_pl['categories'].to_list())

(True, set())

# Categorización de las reseñas por DeepSeek

A continuación, enviamos cada una de las reseñas seleccionados del dataset de Yelp para la categorización por el modelo de IA DeepSeek.

## 1. Defnimos el comportamiento del prompt con la técnica de few-shot prompting.

Este primer prompt define la plantilla para estructurar y enviar al modelo los datos detallados de cada reseña (incluyendo texto, valoraciones, metadatos del negocio y del usuario, y categorías), los cuales se presentan como ejemplos de un tipo ({{type}}) (Fake o Genuine) específico.  Finalmente este promt será utilizado como complemento del prompt principal.

In [9]:
example_base_review_template = f"""
        ---
        Example {{type}} Review Text: {{review_text}}
        Example {{type}} Review Stars: {{stars}}
        Example {{type}} Business Avg Stars: {{stars_business}}
        Example {{type}} Business Review Count: {{review_count_business}}
        Example {{type}} User Avg Stars: {{average_stars_user}}
        Example {{type}} User Review Count: {{review_count_user}}
        Example {{type}} Business Categories: {{categories}}
        ---
        """
# Modificamos la plantilla para usar llaves simples para .format()
example_base_review_template = example_base_review_template.replace("{{", "{").replace("}}", "}")

Este es el prompt principal, encargado de guiar al modelo DeepSeek para categorizar cada reseña como 'Genuina' o 'Falsa'. Proporciona un conjunto detallado de instrucciones que especifican: la tarea de clasificación, los criterios para cada categoría (basados en el texto de la reseña y sus metadatos asociados), además, se incorporan los ejemplos (a través de la variable {{reviews_examples_with_metadata}}, que se alimenta con el prompt anterior), y el formato estricto que debe seguir la respuesta (Genuina o Falsa).

In [10]:
base_prompt_template = f"""
* Task: Classify the customer review provided at the end into a single category: Genuine or Fake, considering the review text AND associated metadata.

* Metadata Provided:
- Review Stars: Rating (1-5) in this specific review.
- Business Avg Stars: The business's average rating across all reviews.
- Business Review Count: Total number of reviews for the business.
- User Avg Stars: The average rating given by this user across all their reviews.
- User Review Count: Total number of reviews written by this user.
- Business Categories: List of categories the business belongs to.

* Category Definitions:
- Genuine: The review appears to be from a real customer with an authentic experience. Details in text are often plausible and consistent with metadata patterns (e.g., user has some review history, rating aligns somewhat with user/business averages unless specific reasons given in text). The Business Categories provide context about the type of service/product reviewed. # <-- Mención añadida
- Fake: The review appears artificial, possibly promotional or defamatory. May show metadata inconsistencies (e.g., user with 1 review count + extreme generic rating, rating wildly different from business average without clear textual explanation). Text details (or lack thereof) are primary, but metadata provides important context. The Business Categories provide context about the type of service/product reviewed. # <-- Mención añadida

* Examples of Classification (including metadata):
--- START EXAMPLES ---
{{reviews_examples_with_metadata}}
--- END EXAMPLES ---

* Now, classify the following review using its text and metadata:

- Review Text: {{text}}
- Review Stars: {{stars}}
- Business Avg Stars: {{stars_business}}
- Business Review Count: {{review_count_business}}
- User Avg Stars: {{average_stars_user}}
- User Review Count: {{review_count_user}}
- Business Categories: {{categories}} 

* Output Instructions: Return ONLY the category name ('Genuine' or 'Fake') and nothing else.
"""
# Modificamos la plantilla para usar llaves simples para .format()
base_prompt_template = base_prompt_template.replace("{{", "{").replace("}}", "}")

## 2. Seguimos con el formateo de los ejemplos en una única cadena de texto.

A continuación, una función para construir el prompt de reseñas de ejemplo.

In [11]:
# Lista de columnas necesarias para la plantilla del prompt
columns_needed_prompt = [
    "text", "stars", "stars_business", "review_count_business",
    "average_stars_user", "review_count_user", "categories"
]

def format_prompt_for_row(row_data: dict) -> str:
    """
    Toma un diccionario (row_data) con los datos de la fila y rellena la plantilla.
    Maneja valores nulos convirtiéndolos a string (p.ej., 'None').
    """
    # Crea una copia o un nuevo diccionario para asegurar que todos los valores sean strings
    # y manejar los nulos de forma segura para .format()
    data_for_format = {}
    final_prompt = None
    separator = ", "
    for key, value in row_data.items():   
        if key == 'text':
            data_for_format[key] = str(value).replace('\n', ' ')
        else:     
            data_for_format[key] = str(value)

        reviews_examples = ''

        if key == 'categories':
            find_category = str(value)
            finded_reviews = finder.get_reviews_by_category(find_category)
            for type_review in finded_reviews:
                formated_example_base_review = None
                for review_dict in finded_reviews[type_review]:
                    joined_category_string = separator.join(review_dict['categories'])
                    formated_example_base_review = example_base_review_template.format(
                        type = type_review,
                        review_text = review_dict['review_text'],
                        stars=review_dict['star'],
                        stars_business=review_dict['stars_business'],
                        review_count_business=review_dict['review_count_business'],
                        average_stars_user=review_dict['average_stars_user'],
                        review_count_user=review_dict['review_count_user'],
                        categories=str(joined_category_string) # Convertir lista a string
                    )
                    reviews_examples = reviews_examples + formated_example_base_review
            data_for_format['reviews_examples_with_metadata'] = reviews_examples
        
    
    try:
        return base_prompt_template.format(**data_for_format)
    except KeyError as e:
        # Error útil si falta un placeholder en los datos de la fila
        print(f"Error: Falta la clave {e} en los datos de la fila para formatear la plantilla.")
        return f"ERROR_FORMATTING_MISSING_KEY_{e}"
    except Exception as e:
        print(f"Error inesperado al formatear: {e}")
        return "ERROR_FORMATTING_UNEXPECTED"

## 3. Procesamiento y creación del prompt para cada una de las reseñas.

A continuación, es construido el prompt principal para cada una de las reseñas del dataset seleccionado.

In [12]:
start_time=time.time()
try:
    df_data_reviews_selected_pl = df_data_reviews_selected_pl.with_columns(
        # Crea un struct (como un objeto temporal por fila) con las columnas necesarias
        pl.struct([pl.col(c) for c in columns_needed_prompt])
        # Aplica la función de formateo a cada struct
        .map_elements(
            lambda row_struct: format_prompt_for_row(row_struct),
            return_dtype=pl.Utf8  # Especifica que la función devuelve strings
        )
        # Dale un nombre a la nueva columna resultante
        .alias("prompt_completo")
    )
    # --- 5. Mostrar resultado (opcional) ---
    print(f"DataFrame Polars con la nueva columna 'prompt_completo' ({len(df_data_reviews_selected_pl )} filas):")
    pl.Config.set_fmt_str_lengths(200) # Aumenta el ancho para ver mejor el prompt

    # Imprime solo la nueva columna para verificar una fila
    print("\nEjemplo de la primera fila de la columna 'prompt_completo':")
    print(df_data_reviews_selected_pl.row(0, named=True)['prompt_completo'])
except Exception as e:
    print(f"An error occurred: {e}")
else: 
    print("-" * 50 + "\n")
    print(f"La operación fue exitosa en {time.time()-start_time:.2f}seg")

DataFrame Polars con la nueva columna 'prompt_completo' (17000 filas):

Ejemplo de la primera fila de la columna 'prompt_completo':

* Task: Classify the customer review provided at the end into a single category: Genuine or Fake, considering the review text AND associated metadata.

* Metadata Provided:
- Review Stars: Rating (1-5) in this specific review.
- Business Avg Stars: The business's average rating across all reviews.
- Business Review Count: Total number of reviews for the business.
- User Avg Stars: The average rating given by this user across all their reviews.
- User Review Count: Total number of reviews written by this user.
- Business Categories: List of categories the business belongs to.

* Category Definitions:
- Genuine: The review appears to be from a real customer with an authentic experience. Details in text are often plausible and consistent with metadata patterns (e.g., user has some review history, rating aligns somewhat with user/business averages unless spec

In [13]:
# --Opcional -- Guardar como Arrow IPC
try:
    review_selected_file_path = f'{explicit_work_path}yelp_academic_dataset_review_selected_prompt.arrow'
    df_data_reviews_selected_pl.write_ipc(review_selected_file_path, compression='zstd')    
except Exception as e:
    print(f"Error al guardar IPC: {e}")
else:
    print('Fichero seleccionados guardado en IPC')

Fichero seleccionados guardado en IPC


### 3.1 Punto de control
En este punto guardamos el dataset seleccionado junto con el prompt de cada reseña.

In [5]:
# En caso de error debe volver a iniciar desde la lectura.
try:
    review_selected_file_path = f'{explicit_work_path}yelp_academic_dataset_review_selected_prompt.arrow'
    df_data_reviews_selected_pl = pl.read_ipc(review_selected_file_path, memory_map=False)
    print("¡Archivo de reseñas con prompt leido!")
    # Muestra las primeras filas y la información del DataFrame
    print(df_data_reviews_selected_pl.head(1))
    print(df_data_reviews_selected_pl.shape)
    print(df_data_reviews_selected_pl.schema)

except FileNotFoundError:
    print(f"Error: El archivo no se encontró en la ruta: {review_selected_file_path}")
except Exception as e:
    print(f"Ocurrió un error al leer el archivo con Polars: {e}")

¡Archivo de reseñas con prompt leido!
shape: (1, 11)
┌───────────┬───────────┬───────────┬───────────┬───┬───────────┬───────────┬───────────┬──────────┐
│ review_id ┆ user_id   ┆ business_ ┆ text      ┆ … ┆ stars_bus ┆ review_co ┆ average_s ┆ prompt_c │
│ ---       ┆ ---       ┆ id        ┆ ---       ┆   ┆ iness     ┆ unt_user  ┆ tars_user ┆ ompleto  │
│ str       ┆ str       ┆ ---       ┆ str       ┆   ┆ ---       ┆ ---       ┆ ---       ┆ ---      │
│           ┆           ┆ str       ┆           ┆   ┆ f64       ┆ i64       ┆ f64       ┆ str      │
╞═══════════╪═══════════╪═══════════╪═══════════╪═══╪═══════════╪═══════════╪═══════════╪══════════╡
│ YkbKQcSDQ ┆ IpLRJY4CP ┆ 34-i9B0d0 ┆ This      ┆ … ┆ 3.0       ┆ 518       ┆ 2.95      ┆          │
│ AMJm0WboU ┆ 3fXtlEd8Y ┆ r0575-x6x ┆ place is  ┆   ┆           ┆           ┆           ┆ * Task:  │
│ BOLA      ┆ 4GFQ      ┆ DTsw      ┆ nothing   ┆   ┆           ┆           ┆           ┆ Classify │
│           ┆           ┆           ┆ 

## 4. Función control de re-ejecución de categorización
A continuación, una función para controlar la ultima ejecución del dataset y no iniciar desde el principio.

In [10]:
# Lógica para reanudar el procesamiento
# Función para leer índices procesados
def get_processed_indices(temp_file_path: str) -> set[int]:
    """
    Lee el archivo temporal JSON Lines y devuelve un conjunto de índices
    de reseñas que ya han sido procesadas.

    Args:
        temp_file_path: La ruta al archivo temporal JSON Lines.

    Returns:
        Un conjunto de enteros representando los índices procesados.
    """
    processed_indices = set()
    if os.path.exists(temp_file_path):
        print(f"\nArchivo temporal '{temp_file_path}' encontrado. Leyendo índices procesados...")
        try:
            with open(temp_file_path, 'r', encoding='utf8') as f:
                for line_num, line in enumerate(f):
                    line = line.strip()
                    if not line: # Saltar líneas vacías
                        continue
                    try:
                        data = json.loads(line)
                        # Asumimos que el índice se guarda bajo la clave 'index'
                        if 'review_id' in data:
                            processed_indices.add(data['review_id'])
                        else:
                             print(f"Advertencia: Línea {line_num + 1} en '{temp_file_path}' no contiene un índice válido.")
                    except json.JSONDecodeError:
                        print(f"Advertencia: Ignorando línea no válida/incompleta en el archivo temporal (línea {line_num + 1}).")
                        # Si la decodificación falla, asumimos que es una línea parcial al final
                        break # Detener la lectura aquí
            print(f"Encontrados {len(processed_indices)} reseñas procesadas anteriormente.")
        except Exception as e:
            print(f"Error al leer el archivo temporal '{temp_file_path}': {e}")
            print("Se procesarán todas las reseñas desde el inicio por precaución.")
            processed_indices = set() # Si hay un error de lectura, empezar de cero
    else:
        print(f"\nArchivo temporal '{temp_file_path}' no encontrado. Iniciando procesamiento desde el inicio.")

    return processed_indices

# Archivos de salida de las reseñas categorizadas
reviews_categorize_output_file = f'{explicit_work_path}final_reviews_categorize.jsonl'

# Obtener los índices ya procesados llamando a la nueva función
processed_indices = get_processed_indices(reviews_categorize_output_file)


Archivo temporal '/home/familia4/fakereviews/final_reviews_categorize.jsonl' encontrado. Leyendo índices procesados...
Encontrados 372 reseñas procesadas anteriormente.


## 5. Creación del cliente o conexión con DeepSeek.
A continuación, haremos la conexión y la definición de variables claves para resolver la categoría de la reseña.

In [None]:
# modelo de DeepSeek para tareas
MODEL_NAME = "deepseek-chat"
print('Inicia procesamiento de los datos en DeepSeek')
# --- Configurar el cliente de OpenAI para usar el endpoint de DeepSeek ---
client = OpenAI(
    base_url=deepseek_base_url,
    api_key=deepseek_api_key,
    timeout=httpx.Timeout(deepseek_time_out, connect=5.0)
)

# Variables claves
# Crear Colas
tasks_to_do = queue.Queue()
results_done = queue.Queue()

Inicia procesamiento de los datos en DeepSeek


## 6. Creación de workers para la conexión
Para resolver más rapidamente la categorización de las reseñas hacemos conexión con hilos, para ello creamos la siguiente función.

In [None]:
# --- Función del Trabajador (Worker) ---
# Esta función será ejecutada por cada hilo.
# Toma tareas de una cola, las procesa (llama a la API) y pone resultados en otra cola.

def worker(task_queue: queue.Queue, result_queue: queue.Queue, worker_id):
    """
        Procesa reseñas de la cola de tareas y pone resultados en la cola de resultados.
        Args:
            task_queue: Contiene el id de la reseña y el prompt.
            result_queue: Contendrá la salida de cada una de las evaluaciones.
            worker_id: Id unico para controlar los hilos.

    """
    
    print(f"[Worker {worker_id}] Iniciado.")
    while True:
        start_time=time.time()
        task_data = task_queue.get()
        if task_data is None:
            task_queue.task_done()
            break

        review_id = task_data.get('review_id', 'ID_DESCONOCIDO')
        final_prompt = task_data.get('prompt_completo', None)
        if not final_prompt:
            print(f"[Worker {worker_id}] ERROR: Falta la clave en los datos para {review_id}. Omitiendo.")
            result_data = {"review_id": review_id, "status": "missing_data_error", "classification": "None"}
            result_queue.put(result_data)
            task_queue.task_done()
            continue # Pasar a la siguiente tarea

        # Preparar datos para resultado
        result_data = {"review_id": review_id}
        

        try:

            response = client.chat.completions.create(
                model=MODEL_NAME,
                messages=[{"role": "system", "content": final_prompt}],
                max_tokens=10,  # Solo necesitamos 'Genuine' o 'Fake'
                temperature=0.1, # Muy baja para clasificación directa
                n=1,
                stop=None
            )
            evaluation = response.choices[0].message.content.strip()

            # Validar la salida (opcional pero recomendado)
            if evaluation in ["Genuine", "Fake"]:
                 result_data.update({"status": "success", "classification": evaluation})
            else:
                 print(f"[Worker {worker_id}] WARN: Salida inesperada para {review_id}: '{evaluation}'. Marcando como error.")
                 result_data.update({"status": "invalid_output", "classification": "None"})

        except RateLimitError:
            print(f"[Worker {worker_id}] WARN: Rate Limit en {review_id}. Re-encolando...")
            task_queue.put(task_data) # Re-encolar la tarea (diccionario)
            time.sleep(20 + worker_id * 2)
            result_data.update({"status": "rate_limit_error", "details": "Tarea re-encolada"})
        except APIError as e:
            print(f"[Worker {worker_id}] ERROR API en {review_id}: {e}")
            result_data.update({"status": "api_error", "details": str(e)})
            print(f"Tipo de error: {type(e).__name__}") # Nombre de la clase de la excepción
        except Exception as e:
            print(f"[Worker {worker_id}] ERROR Inesperado en {review_id}: {e}")
            result_data.update({"status": "unexpected_error", "details": str(e)})
        finally:
             result_queue.put(result_data)
             task_queue.task_done()
        print(f'[Worker {worker_id}] Terminado. Total tiempo ejecución: {time.time() - start_time:.2f}')

## 7. Creación de función para almacenar resultados
Con el objetivo de ir guardando los resultados de la evaluaciónd de cada reseña, la siguiente función los guardará según cada hilo vaya terminando.

In [13]:
def result_writer(result_queue, filename, mode='a'):
    print(f"[Writer] Iniciado. Escribiendo resultados en {filename} (modo: '{mode}')")
    count = 0
    processed_in_session = 0
    try:
        with open(filename, mode, encoding='utf-8') as f:
            while True:
                result_data = result_queue.get()
                if result_data is None:
                    result_queue.task_done()
                    break
                try:
                    json_line = json.dumps(result_data, ensure_ascii=False)
                    f.write(json_line + '\n')
                    processed_in_session += 1
                    if processed_in_session % 100 == 0:
                        print(f"[Writer] {processed_in_session} nuevos resultados escritos en esta sesión.")
                except Exception as e:
                    print(f"[Writer] ERROR escribiendo resultado para {result_data.get('review_id', 'ID_DESCONOCIDO')}: {e}")
                finally:
                    result_queue.task_done()
    except IOError as e:
         print(f"[Writer] ERROR al abrir/escribir el archivo {filename}: {e}")
    print(f"[Writer] Señal de parada recibida. Total escritos en esta sesión: {processed_in_session}. Terminando.")

## 8. Creación de hilo escritor del resultado de las reseñas

In [None]:
# 4. Iniciar Hilo Escritor (igual, pasando modo)
print(f"Iniciando hilo escritor para guardar en {reviews_categorize_output_file}...")
writer_thread = threading.Thread(target=result_writer, args=(results_done, reviews_categorize_output_file), daemon=True)
writer_thread.start()

Iniciando hilo escritor para guardar en /home/familia4/fakereviews/final_reviews_categorize.jsonl...
[Writer] Iniciado. Escribiendo resultados en /home/familia4/fakereviews/final_reviews_categorize.jsonl (modo: 'a')


[Writer] 100 nuevos resultados escritos en esta sesión.
[Writer] 200 nuevos resultados escritos en esta sesión.
[Writer] 300 nuevos resultados escritos en esta sesión.
[Writer] 400 nuevos resultados escritos en esta sesión.
[Writer] 500 nuevos resultados escritos en esta sesión.
[Writer] 600 nuevos resultados escritos en esta sesión.
[Writer] 700 nuevos resultados escritos en esta sesión.
[Writer] 800 nuevos resultados escritos en esta sesión.
[Writer] 900 nuevos resultados escritos en esta sesión.
[Writer] 1000 nuevos resultados escritos en esta sesión.
[Writer] 1100 nuevos resultados escritos en esta sesión.
[Writer] 1200 nuevos resultados escritos en esta sesión.
[Writer] 1300 nuevos resultados escritos en esta sesión.
[Writer] 1400 nuevos resultados escritos en esta sesión.
[Writer] 1500 nuevos resultados escritos en esta sesión.
[Writer] 1600 nuevos resultados escritos en esta sesión.
[Writer] 1700 nuevos resultados escritos en esta sesión.
[Writer] 1800 nuevos resultados escritos

## 9. Cargar los prompts en la cola de procesamiento.

Para realización la ejecución por hilos son agregados a una cola de procesamiento.

In [None]:
# Selección de las columnas de trabajo.
required_columns = ['review_id', 'prompt_completo']

# 5. Llenar la cola de tareas (MODIFICADO para enviar diccionarios)
print(f"Llenando la cola con tareas desde el DataFrame ({len(df_data_reviews_selected_pl)} filas).")
tasks_added_this_session = 0
# Seleccionar todas las columnas necesarias y convertir a diccionarios
for task_dict in df_data_reviews_selected_pl.limit.select(required_columns).iter_rows(named=True):
    review_id = task_dict['review_id']
    if review_id not in processed_indices:
        tasks_to_do.put(task_dict) # Poner el diccionario entero en la cola
        tasks_added_this_session += 1

print(f"Cola de tareas llena con {tasks_added_this_session} nuevas tareas (se omitieron {len(processed_indices)} encontradas en el archivo).")

# Control para finalizar los hilos.
if tasks_added_this_session == 0:
    print("No hay nuevas reseñas para procesar.")
    results_done.put(None)
    writer_thread.join()
    print("Proceso finalizado.")
    exit()

Llenando la cola con tareas desde el DataFrame (17000 filas).
Cola de tareas llena con 17000 nuevas tareas (se omitieron 0 encontradas en el archivo).


## 10. Definición de los hilos

En este apartado damos inicio a la ejecución de cada uno de los hilos.

In [18]:
NUM_THREADS = 5
threads = []
print(f"Creando e iniciando {NUM_THREADS} hilos trabajadores.")
for i in range(NUM_THREADS):
    # Pasar la plantilla base como argumento al worker
    t = threading.Thread(target=worker, args=(tasks_to_do, results_done, i + 1), daemon=True)
    t.start()
    threads.append(t)
print(f"Hilos trabajadores iniciados, total {len(threads)}")

Creando e iniciando 5 hilos trabajadores.
[Worker 1] Iniciado.
[Worker 2] Iniciado.
[Worker 3] Iniciado.
[Worker 4] Iniciado.
[Worker 5] Iniciado.
Hilos trabajadores iniciados, total 5


[Worker 5] Terminado. Total tiempo ejecución: 5.87
[Worker 3] Terminado. Total tiempo ejecución: 9.86
[Worker 1] Terminado. Total tiempo ejecución: 14.57
[Worker 2] Terminado. Total tiempo ejecución: 19.15
[Worker 4] Terminado. Total tiempo ejecución: 23.04
[Worker 5] Terminado. Total tiempo ejecución: 20.31
[Worker 3] Terminado. Total tiempo ejecución: 20.72
[Worker 1] Terminado. Total tiempo ejecución: 19.58
[Worker 2] Terminado. Total tiempo ejecución: 18.72
[Worker 4] Terminado. Total tiempo ejecución: 18.12
[Worker 5] Terminado. Total tiempo ejecución: 21.21
[Worker 3] Terminado. Total tiempo ejecución: 21.13
[Worker 1] Terminado. Total tiempo ejecución: 24.20
[Worker 2] Terminado. Total tiempo ejecución: 24.39
[Worker 4] Terminado. Total tiempo ejecución: 26.32
[Worker 5] Terminado. Total tiempo ejecución: 23.90
[Worker 3] Terminado. Total tiempo ejecución: 25.65
[Worker 1] Terminado. Total tiempo ejecución: 24.39
[Worker 2] Terminado. Total tiempo ejecución: 26.83
[Worker 4] Ter

## 11. Finalización y registro del resultado de cada uno de los hilos

Este apartado controla la finalización de cada hilo y guarda los resultados.

In [19]:
# Esperar a las tareas
print("Esperando a que todas las tareas nuevas sean procesadas...")
tasks_to_do.join()
print("Todas las tareas nuevas han sido procesadas por los workers.")

# Enviar señales stop a workers
print("Enviando señales de parada (None) a los workers...")
for _ in range(NUM_THREADS):
    tasks_to_do.put(None)

# Esperar a workers
print("Esperando a que los hilos trabajadores terminen...")
for t in threads:
    t.join()
print("Todos los hilos trabajadores han terminado.")

# Enviar señal stop al escritor
print("Enviando señal de parada (None) al hilo escritor...")
results_done.put(None)

# Esperar al escritor
print("Esperando a que el hilo escritor termine...")
writer_thread.join()
print("Hilo escritor terminado.")

print(f"\nProcesamiento completado. Los resultados actualizados están en {reviews_categorize_output_file}")

Esperando a que todas las tareas nuevas sean procesadas...
Todas las tareas nuevas han sido procesadas por los workers.
Enviando señales de parada (None) a los workers...
Esperando a que los hilos trabajadores terminen...
Todos los hilos trabajadores han terminado.
Enviando señal de parada (None) al hilo escritor...
Esperando a que el hilo escritor termine...
Hilo escritor terminado.

Procesamiento completado. Los resultados actualizados están en /home/familia4/fakereviews/final_reviews_categorize.jsonl
