In [None]:
import os
# Definir la estructura del proyecto
directories = [
    "data/raw",
    "data/processed",
    "data/external",
    "data/interim",

]

# Crear los directorios
for directory in directories:
    os.makedirs(directory, exist_ok=True)
print("Estructura de directorios y archivos creada con éxito.")


Estructura de directorios y archivos creada con éxito.


In [None]:
import os
import requests
import gzip
import shutil

# Definir la URL del archivo y la ruta de destino
url = "https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_2023/raw/review_categories/Software.jsonl.gz"
download_path = "data/raw/Software.jsonl.gz"
extract_path = "data/raw/Software.jsonl"

# Crear el directorio si no existe
os.makedirs(os.path.dirname(download_path), exist_ok=True)

# Descargar el archivo
print("Descargando archivo...")
response = requests.get(url, stream=True)
with open(download_path, "wb") as file:
    shutil.copyfileobj(response.raw, file)
print("Descarga completada.")

# Extraer el archivo .gz
print("Extrayendo archivo...")
with gzip.open(download_path, "rb") as f_in:
    with open(extract_path, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)
print("Extracción completada.")

# Opcional: eliminar el archivo comprimido
os.remove(download_path)
print("Archivo comprimido eliminado.")

Descargando archivo...
Descarga completada.
Extrayendo archivo...
Extracción completada.
Archivo comprimido eliminado.


In [None]:
!pip install pyspark



In [None]:
!pip install fasttext

Collecting fasttext
  Downloading fasttext-0.9.3.tar.gz (73 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/73.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m71.7/73.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.4/73.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting pybind11>=2.2 (from fasttext)
  Using cached pybind11-2.13.6-py3-none-any.whl.metadata (9.5 kB)
Using cached pybind11-2.13.6-py3-none-any.whl (243 kB)
Building wheels for collected packages: fasttext
  Building wheel for fasttext (pyproject.toml) ... [?25l[?25hdone
  Created wheel for fasttext: filename=fasttext-0.9.3-cp311-cp311-linux_x86_64.whl size=4313502 sha256=d1ec01b93a1f5e

In [None]:
!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin -P models/


--2025-03-20 00:27:32--  https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin
Resolving dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)... 13.226.34.53, 13.226.34.7, 13.226.34.83, ...
Connecting to dl.fbaipublicfiles.com (dl.fbaipublicfiles.com)|13.226.34.53|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 131266198 (125M) [application/octet-stream]
Saving to: ‘models/lid.176.bin’


2025-03-20 00:27:33 (159 MB/s) - ‘models/lid.176.bin’ saved [131266198/131266198]



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import fasttext
import numpy as np  # Import NumPy
file_path = "data/models/lid.176.bin"

model = fasttext.load_model(file_path)  # Cargar modelo en cada worker

def detect_language(text):
    if not text:
        return "unknown"  # Evita errores si `text` es None
    label= model.predict([text.replace("\n", " ")])
    return label[0][0][0].replace("__label__", "")

# 3️⃣ Crear la UDF en PySpark
detect_language_udf = udf(detect_language, StringType())

# 4️⃣ Cargar dataset procesado
df = spark.read.parquet("data/processed/Software_processed.parquet")

# # 5️⃣ Aplicar la detección de idioma
df = df.withColumn("language", detect_language_udf(col("text")))
print(df.columns)  # Verifica si la columna "language" fue agregada
df.show(20, truncate=True)


['asin', 'helpful_vote', 'images', 'parent_asin', 'rating', 'text', 'timestamp', 'title', 'user_id', 'verified_purchase', 'label', 'language']
+----------+------------+------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+-----+--------+
|      asin|helpful_vote|images|parent_asin|rating|                text|    timestamp|               title|             user_id|verified_purchase|label|language|
+----------+------------+------+-----------+------+--------------------+-------------+--------------------+--------------------+-----------------+-----+--------+
|B06XW6RFV2|          38|    []| B06XW6RFV2|   1.0|                  1 |1492261436000|ⓢⓞ ⓢⓣⓤⓟⓘⓓ ⓓⓞⓝ'ⓣ ⓖ...|AEBCICFZOHMXJQN5D...|             true|    0|      en|
|B019DCHDZK|           0|    []| B019DCHDZK|   1.0| 6300 a month on ...|1617545629074|       too expensive|AFNRECJDZ6HSZ5YJM...|             true|    0|      en|
|B007TAX5D8|           2|    []| B007TAX5D8|   

In [None]:
spark.stop()
from pyspark.sql import SparkSession

# Liberar caché de todos los DataFrames
spark.catalog.clearCache()
import gc

# Cerrar sesiones activas
for obj in gc.get_objects():
    if isinstance(obj, SparkSession):
        obj.stop()
%reset -f  # Solo en Jupyter Notebook

# O en un script de Python:
globals().clear()
locals().clear()


Don't know how to reset  #, please run `%reset?` for details
Don't know how to reset  solo, please run `%reset?` for details
Don't know how to reset  en, please run `%reset?` for details
Don't know how to reset  jupyter, please run `%reset?` for details
Don't know how to reset  notebook, please run `%reset?` for details


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, lower, regexp_replace, count, when
from pyspark.sql.window import Window
import fasttext
from pyspark.sql.types import StringType
import os
from pyspark.sql import functions as F

def create_spark_session():
    """Crea una sesión de Spark."""
    return SparkSession.builder.appName("PreprocesamientoAmazonReviews").getOrCreate()

def load_data(file_path, spark):
    """Carga los datos desde Parquet."""
    print(f"📥 Cargando datos desde {file_path}...")
    df = spark.read.parquet(file_path)
    print("✅ Datos cargados correctamente.")
    return df

def filter_english_reviews(df):
    """Filtra solo las reseñas en inglés si la columna 'language' existe."""
    if "language" in df.columns:
        print("\n🗑️ Eliminando reseñas en idiomas distintos al inglés...")
        # df = df.filter(col("language") == "en")
        # df = df.repartition("language").filter(col("language") == "en")
        # df = df.select("language", "text").filter(col("language") == "en")
        df = df.filter(col("language") == "en").select("language", "label","text")

        print(f"✅ Total de registros después del filtro de idioma: {df.count()}")
    else:
        print("\n⚠️ Advertencia: La columna 'language' no existe en el DataFrame. No se aplica el filtro.")

    return df

def remove_neutral_reviews(df):
    """Elimina reseñas de 3 estrellas para convertirlo en un problema binario."""
    print("\n🗑️ Eliminando reseñas de 3 estrellas...")
    df = df.filter(col("rating") != 3)

    # Convertir ratings en clasificación binaria (0 = negativo, 1 = positivo)
    df = df.withColumn("label", when(col("rating") <= 2, 0).otherwise(1))

    print(f"✅ Total de registros después de eliminación de neutrales: {df.count()}")
    return df

def clean_text(df):
    """Limpia el texto de las reseñas eliminando caracteres especiales."""
    print("\n🧹 Limpiando texto...")
    df = df.withColumn("text", lower(col("text")))  # Convertir a minúsculas
    df = df.withColumn("text", regexp_replace(col("text"), "[^a-zA-Z0-9\s]", ""))  # Eliminar caracteres especiales
    df = df.withColumn("text", regexp_replace(col("text"), "\s+", " "))  # Eliminar espacios extras
    print("✅ Texto limpiado.")
    return df

def remove_duplicates_and_empty(df):
    """Elimina duplicados y valores nulos."""
    print("\n🗑️ Eliminando valores nulos y duplicados...")
    # df = df.filter((col("text").isNotNull()) & (col("text") != ""))  # Eliminar reseñas vacías
    # df = df.dropDuplicates(["text"])  # Eliminar reseñas duplicadas
    df = df.filter((col("text").isNotNull()) & (col("text") != "")).dropDuplicates(["text"])

    # print(f"✅ Total de registros después de limpieza: {df.count()}")
    return df

def undersampling(df):
    """Balancea las clases reduciendo la cantidad de reseñas positivas (Undersampling)."""
    print("\n⚖️ Aplicando Undersampling para balancear clases...")

    # Contar cantidad de positivos y negativos
    """class_counts = df.groupBy("label").count().collect()
    positive_count = next(x["count"] for x in class_counts if x["label"] == 1)
    negative_count = next(x["count"] for x in class_counts if x["label"] == 0)
    min_class_count = min(positive_count, negative_count)  # Seleccionar la menor cantidad
    """
    # ✅ Contar clases de forma más eficiente sin `collect()`
    class_counts = df.groupBy("label").count().toPandas().set_index("label")["count"]
    positive_count, negative_count = class_counts.get(1, 0), class_counts.get(0, 0)
    min_class_count = min(positive_count, negative_count)

    print(f"🔹 Positivos: {positive_count}, Negativos: {negative_count}")
    print(f"✅ Reduciéndolos a: {min_class_count}")

    """# Seleccionar aleatoriamente `min_class_count` reseñas de cada clase
    df_positive = df.filter(col("label") == 1).sample(False, min_class_count / positive_count, seed=42)
    df_negative = df.filter(col("label") == 0).sample(False, min_class_count / negative_count, seed=42)

    df_balanced = df_positive.union(df_negative)
    """
    # ✅ Filtrar y muestrear en una sola operación
    df_balanced = (
        df.withColumn("rand", F.rand(seed=42))  # Agregar una columna aleatoria para el muestreo
        .withColumn("rank", F.row_number().over(Window.partitionBy("label").orderBy("rand")))
        .filter(col("rank") <= min_class_count)  # Filtrar para balancear
        .drop("rand", "rank")  # Limpiar columnas auxiliares
    )

    print(f"✅ Total de registros después del balanceo: {df_balanced.count()}")
    return df_balanced

def save_cleaned_data(df, output_path):
    """Guarda los datos procesados en Parquet."""
    print(f"\n💾 Guardando datos procesados en {output_path}...")
    df.write.mode("overwrite").parquet(output_path)
    print("✅ Datos guardados correctamente.")

# Función para cargar el modelo FastText en cada worker de Spark
def get_fasttext_model():
    """Carga el modelo FastText en cada worker solo una vez."""
    return fasttext.load_model("models/lid.176.bin")

def detect_language(text):
    """Detecta el idioma usando FastText."""
    try:
        if not text:
            return "unknown"

        # Cargar modelo FastText en cada worker
        model = get_fasttext_model()

        label = model.predict([text.replace("\n", " ")])
        return label[0][0][0].replace("__label__", "")
    except Exception:
        return "unknown"

def check_and_download_file(file_path, url):
    """
    Verifica si el archivo existe en la ruta especificada.
    Si no existe, lo descarga usando wget.

    """
    if os.path.exists(file_path):
        print(f"✅ El archivo ya existe: {file_path}")
    else:
        print(f"⚠️ El archivo no existe. Descargando desde: {url}")
        os.system(f"wget {url} -P {os.path.dirname(file_path)}")
        print("✅ Descarga completada.")

detect_language_udf = udf(detect_language, StringType())

def add_language_column(df):
    # Convertir la función en UDF para PySpark

    """Añade la columna 'language' con la detección de idioma."""
    print("\n🌍 Detectando idioma de las reseñas...")
    df = df.withColumn("language", detect_language_udf(col("text")))
    print("✅ Detección de idioma completada.")
    return df





In [None]:
  # 📌 Definir rutas
  interim_data_path = "data/interim/Software_interim.parquet"
  processed_data_path = "data/processed/Software_processed.parquet"
  from pyspark import SparkContext

  # Crear sesión de Spark
  spark = create_spark_session()

  # Cargar datos
  df = load_data(interim_data_path, spark)
  df.printSchema()
  df = df.select(col("text"), col("rating"))
  df = remove_neutral_reviews(df)
  # # Aplicar detección de idioma
  df.show(5, truncate=True)


  file_path = "data/models/lid.176.bin"
  url = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin"
  check_and_download_file(file_path, url)
  # df = spark.read.parquet("data/processed/Software_processed.parquet")
  res=detect_language("hola como estas soy de colomb")
  # Definir la función con carga de modelo en cada worker
  def detect_language_udf():
      model = None  # Variable para almacenar el modelo en caché

      def detect_language(text):
          nonlocal model
          if model is None:
              model = fasttext.load_model(file_path)  # ✅ Carga del modelo en cada worker
          try:
              if not text:
                  return "unknown"
              label = model.predict([text.replace("\n", " ")])
              return label[0][0][0].replace("__label__", "")
          except Exception:
              return "unknown"

      return detect_language

  # Convertir la función en una UDF
  detect_language = udf(detect_language_udf(), StringType())
  # Aplicar la UDF al DataFrame de Spark
  df = df.withColumn("language", detect_language(col("text")))
  df.show(5, truncate=True)


  # # 📌 Verificar los datos antes de seguir
  # df.select("text", "language").show(5, truncate=True)

  # # Eliminar reseñas neutrales y convertir a clasificación binaria
  df = filter_english_reviews(df)
  # df = remove_neutral_reviews(df)
  df.show(5, truncate=True)



📥 Cargando datos desde data/interim/Software_interim.parquet...
✅ Datos cargados correctamente.
root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)


🗑️ Eliminando reseñas de 3 estrellas...
✅ Total de registros después de eliminación de neutrales: 4460825
+--------------------+------+-----+
|                text|rating|label|
+--------------------+------+-----+
|Great pr

In [None]:
  # # # Preprocesar datos
  df = clean_text(df)
  df = remove_duplicates_and_empty(df)

  # # Guardar datos procesados
  save_cleaned_data(df, processed_data_path)

  print("\n🎯 Proceso de preprocesamiento completado.")


🧹 Limpiando texto...
✅ Texto limpiado.

🗑️ Eliminando valores nulos y duplicados...

💾 Guardando datos procesados en data/processed/Software_processed.parquet...
✅ Datos guardados correctamente.

🎯 Proceso de preprocesamiento completado.


In [None]:
  # # Aplicar Undersampling
  df = undersampling(df)

  # # Guardar datos procesados
  save_cleaned_data(df, processed_data_path)

  print("\n🎯 Proceso de preprocesamiento completado.")


⚖️ Aplicando Undersampling para balancear clases...
🔹 Positivos: 2748554, Negativos: 803325
✅ Reduciéndolos a: 803325
✅ Total de registros después del balanceo: 1606650

💾 Guardando datos procesados en data/processed/Software_processed.parquet...
✅ Datos guardados correctamente.

🎯 Proceso de preprocesamiento completado.


In [None]:
!rm -r data.zip
!zip -r data.zip data
from google.colab import files
files.download("data.zip")


  adding: data/ (stored 0%)
  adding: data/processed/ (stored 0%)
  adding: data/processed/Software_processed.parquet/ (stored 0%)
  adding: data/processed/Software_processed.parquet/.part-00000-fdac93fb-e293-483d-b1b8-f10dfcd7cd52-c000.snappy.parquet.crc (deflated 0%)
  adding: data/processed/Software_processed.parquet/._SUCCESS.crc (stored 0%)
  adding: data/processed/Software_processed.parquet/.part-00001-fdac93fb-e293-483d-b1b8-f10dfcd7cd52-c000.snappy.parquet.crc (deflated 0%)
  adding: data/processed/Software_processed.parquet/_SUCCESS (stored 0%)
  adding: data/processed/Software_processed.parquet/part-00001-fdac93fb-e293-483d-b1b8-f10dfcd7cd52-c000.snappy.parquet (deflated 12%)
  adding: data/processed/Software_processed.parquet/part-00000-fdac93fb-e293-483d-b1b8-f10dfcd7cd52-c000.snappy.parquet (deflated 12%)
  adding: data/interim/ (stored 0%)
  adding: data/interim/Software_interim.parquet/ (stored 0%)
  adding: data/interim/Software_interim.parquet/part-00006-875cac4e-3bad-

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>