# **Predicción de sentimientos de Análisis de Reseñas de Libros en Amazon - Utilización, procesamiento y visualización de grandes volúmenes de datos (Portafolio Análisis)**

Gabriela Chimali Nava Ramírez - A01710530



## Introducción

Este proyecto aborda un desafío de Big Data, analizar 3 millones de reseñas de libros de Amazon, un conjunto de datos de más de 2.6 GB. El objetivo principal es construir y evaluar un modelo de Machine Learning capaz de clasificar automáticamente el sentimiento de una reseña como "Positivo" (1.0) o "Negativo" (0.0).

Para manejar este volumen de datos, todo el proceso se realiza con Apache Spark (PySpark). El cuaderno documenta el flujo de trabajo completo:

1. ETL: Carga, limpieza y unión de los datos de reseñas y metadatos de los libros.

2. Feature Engineering: Conversión del texto de las reseñas (NLP) y las categorías de los libros en características numéricas que el modelo pueda entender.

3. Modelado: Se entrenan dos modelos, Regresión Logística y Random Forest, para comparar su rendimiento. Utilizando una técnica de ponderación de clases (weightCol) para ayudar al Random Forest a manejar el desbalance entre reseñas positivas y negativas.

4. Evaluación: Se evalúan los modelos usando métricas como Precisión, Recall y F1-Score.

5. Visualización: Se exportan los resultados agregados para su visualización en Tableau.

## 0. Inicializar entorno PySpark y montar Google Drive

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

!pip install pyspark -q
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.12/dist-packages/pyspark"

spark = SparkSession.builder.appName("ProjectBigData").getOrCreate()

In [2]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


## 1. ETL inicial de los datos

### 1.1. Carga de DataFrames

In [3]:
# Rutas a los archivos
ratings_path = "/content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/Books_rating.csv"
metadata_path = "/content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/books_data.csv"

print("\nCargando DataFrames...")
df_ratings_raw = spark.read.csv(
    ratings_path,
    header=True,
    inferSchema=True,
    sep=',',
    multiLine=True,
    escape='"'
)

df_books_raw = spark.read.csv(
    metadata_path,
    header=True,
    inferSchema=True,
    sep=',',
    multiLine=True,
    escape='"'
)

# Seleccionar sólo las columnas necesarias
df_ratings = df_ratings_raw.select(
    col("Title").alias("title"),
    col("review/score").alias("reviewScore"),
    col("review/text").alias("reviewText")
)

df_books = df_books_raw.select(
    col("Title").alias("title"),
    "categories",
    "authors",
    "publishedDate"
)


Cargando DataFrames...


In [4]:
def file_size(file_path):
    size_bytes = os.path.getsize(file_path)
    size_gb = size_bytes / (1024 * 1024 * 1024)
    return size_gb

print(f"Reseñas cargadas: {df_ratings.count()}")
print(f"Tamaño del archivo de reseñas: {file_size(ratings_path):.2f} GB")
print(f"Datos de los libros cargados: {df_books.count()}")
print(f"Tamaño del archivo de datos de libros: {file_size(metadata_path):.2f} GB")

Reseñas cargadas: 3000000
Tamaño del archivo de reseñas: 2.66 GB
Datos de los libros cargados: 212404
Tamaño del archivo de datos de libros: 0.17 GB


In [5]:
print("\nPrimeras 10 filas de df_ratings:")
df_ratings.show(10)

print("\nPrimeras 10 filas de df_books:")
df_books.show(10)


Primeras 10 filas de df_ratings:
+--------------------+-----------+--------------------+
|               title|reviewScore|          reviewText|
+--------------------+-----------+--------------------+
|Its Only Art If I...|        4.0|This is only for ...|
|Dr. Seuss: Americ...|        5.0|I don't care much...|
|Dr. Seuss: Americ...|        5.0|If people become ...|
|Dr. Seuss: Americ...|        4.0|Theodore Seuss Ge...|
|Dr. Seuss: Americ...|        4.0|Philip Nel - Dr. ...|
|Dr. Seuss: Americ...|        4.0|"Dr. Seuss: Ameri...|
|Dr. Seuss: Americ...|        5.0|Theodor Seuss Gie...|
|Dr. Seuss: Americ...|        5.0|When I recieved t...|
|Dr. Seuss: Americ...|        5.0|Trams (or any pub...|
|Dr. Seuss: Americ...|        4.0|As far as I am aw...|
+--------------------+-----------+--------------------+
only showing top 10 rows


Primeras 10 filas de df_books:
+--------------------+--------------------+--------------------+-------------+
|               title|          categories|  

### 1.2. Limpieza de los datos para el JOIN

Primero se identifican los registros nulos.

In [7]:
print("Registros nulos en df_books:")
df_books.select([
    count(when(isnull(col) | isnan(col), col)).alias(col)
    for col in df_books.columns
]).show()

print("Registros nulos en df_ratings:")
df_ratings.select([
    count(when(isnull(col) | isnan(col), col)).alias(col)
    for col in df_ratings.columns
]).show()

Registros nulos en df_books:
+-----+----------+-------+-------------+
|title|categories|authors|publishedDate|
+-----+----------+-------+-------------+
|    1|     41199|  31413|        25305|
+-----+----------+-------+-------------+

Registros nulos en df_ratings:
+-----+-----------+----------+
|title|reviewScore|reviewText|
+-----+-----------+----------+
|  208|          0|         8|
+-----+-----------+----------+



Se **eliminan** las filas si las columnas críticas son nulas.
* Para ratings: `["Title", "review/score", "review/text"]`
* Para books: `["Title"]`

Se **imputan** los nulos de las siguientes columnas de books:

`["categories", "authors", "publisher"]` con la etiqueta `"Desconocido"`

`["publicationDate"]` con `0`

In [8]:
# Eliminar filas de ratings
columnas_criticas_ratings = ["title", "reviewScore", "reviewText"]
df_ratings_clean = df_ratings.dropna(subset=columnas_criticas_ratings)

# Eliminar filas de books si "Title" es nulo
df_books_clean = df_books.dropna(subset=["title"])

# Imputar features nulos en books
columnas_imputar = ["categories", "authors"]
df_books_clean = df_books_clean.fillna("Desconocido", subset=columnas_imputar)

df_books_clean = df_books_clean.withColumn(
    "publicationYear",
    substring(col("publishedDate"), 1, 4).cast(IntegerType())
)
# Imputar años nulos o mal formados con 0 en books
df_books_clean = df_books_clean.fillna(0, subset=["publicationYear"])
df_books_clean = df_books_clean.drop("publishedDate")

print(f"Reseñas después de limpieza: {df_ratings_clean.count()}")
print(f"Books después de limpieza: {df_books_clean.count()}")

Reseñas después de limpieza: 2999784
Books después de limpieza: 212403


In [9]:
df_books_clean = df_books_clean.withColumn(
    "title",
    trim(regexp_replace(lower(col("title")), r"[^a-z0-9\s]", ""))
)

df_ratings_clean = df_ratings_clean.withColumn(
    "title",
    trim(regexp_replace(lower(col("title")), r"[^a-z0-9\s]", ""))
)

In [10]:
df_books_clean.show(10)
df_ratings_clean.show(10)

+--------------------+--------------------+--------------------+---------------+
|               title|          categories|             authors|publicationYear|
+--------------------+--------------------+--------------------+---------------+
|its only art if i...|['Comics & Graphi...|    ['Julie Strain']|           1996|
|dr seuss american...|['Biography & Aut...|      ['Philip Nel']|           2005|
|wonderful worship...|        ['Religion']|    ['David R. Ray']|           2000|
|whispers of the w...|         ['Fiction']| ['Veronica Haddon']|           2005|
|nation dance reli...|         Desconocido|     ['Edward Long']|           2003|
|the church of chr...|        ['Religion']|['Everett Ferguson']|           1996|
|the overbury affa...|         Desconocido|['Miriam Allen De...|           1960|
|a walk in the woo...|         Desconocido|    ['Lee Blessing']|           1988|
|saint hyacinth of...|['Biography & Aut...|['Mary Fabyan Win...|           2009|
|rising sons and d...|  ['So

In [11]:
df_books_clean = df_books_clean.dropDuplicates(["title"])
print(f"Books después de borrar duplicados: {df_books_clean.count()}")

Books después de borrar duplicados: 207065


In [12]:
df_join = df_ratings_clean.join(
    df_books_clean,
    on="title",
    how="left"
)

In [13]:
print("Registros nulos en df_join:")
df_join.select([
    count(when(isnull(col) | isnan(col), col)).alias(col)
    for col in df_join.columns
]).show()

print(f"Reseñas después del join: {df_join.count()}")

Registros nulos en df_join:
+-----+-----------+----------+----------+-------+---------------+
|title|reviewScore|reviewText|categories|authors|publicationYear|
+-----+-----------+----------+----------+-------+---------------+
|    0|          0|         0|         0|      0|              0|
+-----+-----------+----------+----------+-------+---------------+

Reseñas después del join: 2999784


### 1.3. Creación de la Etiqueta para el modelo

Usando reviewScore

Ignorar reseñas de 3 estrellas



In [14]:
df_labeled = df_join.withColumn("label",
    when(col("reviewScore") >= 4.0, 1.0)
    .when(col("reviewScore") <= 2.0, 0.0)
    .otherwise(None)
)

df_model_data = df_labeled.dropna(subset=["label"])
print(f"Filas para el modelo (sin 3 estrellas): {df_model_data.count()}")


Filas para el modelo (sin 3 estrellas): 2745497


### 1.4. Limpieza de cadenas de texto

Si hay más de una categoría o autor ligado al libro se **queda** sólo la/el primer nombre registrado en la fila.

In [15]:
# Limpiar y obtener la primera categoría
df_model_data = df_model_data.withColumn("categories",
    regexp_replace(col("categories"), r"[\'\[\]]", "")
)
df_model_data = df_model_data.withColumn("categories",
    split(col("categories"), ",")[0]
)

# Limpiar y obtener el primer autor
df_model_data = df_model_data.withColumn("authors",
    regexp_replace(col("authors"), r"[\'\[\]]", "")
)
df_model_data = df_model_data.withColumn("authors",
    split(col("authors"), ",")[0]
)

df_model_data.show(5)

+--------------------+-----------+--------------------+--------------------+----------+---------------+-----+
|               title|reviewScore|          reviewText|          categories|   authors|publicationYear|label|
+--------------------+-----------+--------------------+--------------------+----------+---------------+-----+
|dr seuss american...|        5.0|I don't care much...|Biography & Autob...|Philip Nel|           2005|  1.0|
|dr seuss american...|        5.0|If people become ...|Biography & Autob...|Philip Nel|           2005|  1.0|
|dr seuss american...|        4.0|Theodore Seuss Ge...|Biography & Autob...|Philip Nel|           2005|  1.0|
|dr seuss american...|        4.0|Philip Nel - Dr. ...|Biography & Autob...|Philip Nel|           2005|  1.0|
|dr seuss american...|        4.0|"Dr. Seuss: Ameri...|Biography & Autob...|Philip Nel|           2005|  1.0|
+--------------------+-----------+--------------------+--------------------+----------+---------------+-----+
only showi

In [16]:
df_complete = df_model_data.select(
    "label",
    "reviewText",
    col ("categories").alias ("category"),
    col("authors").alias ("first_author"),
    "publicationYear"
)

df_complete_clean = df_complete.dropna()

print(f">> Total de filas con strings limpios: {df_complete_clean.count()}")

>> Total de filas con strings limpios: 2745497


### 1.5. Seleccionar sólo las categorías más representativas

1. Calcular cantidad de reseñas.
2. Calcular cantidad de reseñas por categoría y su equivalencia en porcentaje respecto al total.
3. Seleccionar sólo las categorías que representan más del 1% del total.

In [17]:
total_reviews = df_complete_clean.count()

df_category_counts = df_complete_clean.groupBy("category").count()
df_category_counts = df_category_counts.withColumn(
    "percentage",
    (col("count") / total_reviews) * 100
)

categories_to_keep = [row["category"] for row in df_category_counts.filter(col("percentage") > 1).collect()]

print("Categorías que representan más del 1% del total de reseñas:")
print(categories_to_keep)

df_filtered = df_complete_clean.filter(col("category").isin(categories_to_keep))

print(f"\n>> Filas después de filtrar por categorías (> 1%): {df_filtered.count()}")

Categorías que representan más del 1% del total de reseñas:
['Computers', 'Social Science', 'Biography & Autobiography', 'Business & Economics', 'Religion', 'Fiction', 'History', 'Juvenile Nonfiction', 'Self-Help', 'Juvenile Fiction', 'Desconocido', 'Cooking']

>> Filas después de filtrar por categorías (> 1%): 1922939


In [30]:
path_clean = "/content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/df_clean.csv"
df_filtered.coalesce(1).write.csv(path_clean, header=True, mode="overwrite")
print(f"Archivo limpio guardado en: {path_clean}")

Archivo limpio guardado en: /content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/df_clean.csv


## 2. Feature Engineering

### 2.1. Importar librerías y división de datos

Separar los conjuntos de entrenamiento (80%) y prueba (20%).

Se guardarán en el caché para un acceso más rápido.

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, HashingTF, IDF,
    StringIndexer, OneHotEncoder, VectorAssembler
)

(train, test) = df_filtered.randomSplit([0.8, 0.2], seed=42)
train.cache()
test.cache()

DataFrame[label: double, reviewText: string, category: string, first_author: string, publicationYear: int]


### 2.2. Procesamiento de Texto (NLP)

* Tokenizar: Separar el texto en palabras.

* StopWordsRemover: Eliminar palabras comunes que no aportan al modelo. Como "el", "la", "y".

* HashingTF: Convertir las palabras en un vector de números.

In [20]:
tokenizer_text = Tokenizer(inputCol="reviewText", outputCol="words_text")
stopwords_text = StopWordsRemover(inputCol="words_text", outputCol="filtered_text")
hashingtf_text = HashingTF(inputCol="filtered_text", outputCol="features_text", numFeatures=1000)

### 2.3. Procesamiento de variables categóricas

* StringIndexer: Convierte las categorías en un número.

* OneHotEncoder: Convierte en un vector de 1's y 0's ls indices anteriores.

In [21]:
index_category = StringIndexer(inputCol="category", outputCol="category_index", handleInvalid="keep")
ohe_category = OneHotEncoder(inputCol="category_index", outputCol="category_vec")

### 2.4. Ensamblar todas las features



In [22]:
assembler = VectorAssembler(
    inputCols=[
        "features_text",
        "category_vec",
        "publicationYear"
    ],
    outputCol="features"
)

## 3. Modelado

### 3.1. Definir modelo y pipelines

Para el RandomForest se designan manualmente los pesos de las clases dado que nos encontramos con un desbalance significativo

In [23]:
from pyspark.ml.classification import (
    LogisticRegression, RandomForestClassifier
)
from pyspark.sql.functions import col, when

# Modelo 1: Regresión Logística
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Modelo 2: Random Forest
class_weights = {1.0: 1.0, 0.0: 6.0}

# Agregar columna de "weight" al DataFrame
df_weighted = df_filtered.withColumn("weight", when(col("label") == 1.0, class_weights[1.0]).otherwise(class_weights[0.0]))

# Separación de conjuntos
(train_weighted, test_weighted) = df_weighted.randomSplit([0.8, 0.2], seed=42)
train_weighted.cache()
test_weighted.cache()

rf = RandomForestClassifier(featuresCol="features", labelCol="label", seed=42, weightCol="weight")

# --- Pipeline 1: Con Regresión Logística ---
pipeline_lr = Pipeline(stages=[
    tokenizer_text, stopwords_text, hashingtf_text,
    index_category, ohe_category,
    assembler,
    lr
])

# --- Pipeline 2: Con Random Forest ---
pipeline_rf = Pipeline(stages=[
    tokenizer_text, stopwords_text, hashingtf_text,
    index_category, ohe_category,
    assembler,
    rf
])

### 3.2. Entrenamiento de los modelos

In [24]:
print("Entrenando Modelo 1: Regresión Lineal")
model_lr = pipeline_lr.fit(train)

Entrenando Modelo 1: Regresión Lineal


In [25]:
print("\nEntrenando Modelo 2: Random Forest")
model_rf = pipeline_rf.fit(train_weighted)


Entrenando Modelo 2: Random Forest


### 3.3. Generar predicciones

In [29]:
print("Predicciones Modelo 1: Regresión Logística")
predictions_lr = model_lr.transform(test)
print("Predicciones Modelo 2: Random Forest")
predictions_rf = model_rf.transform(test_weighted)

Predicciones Modelo 1: Regresión Logística
Predicciones Modelo 2: Random Forest


## 4. Evaluación de los modelos

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, sum, when

print(">> Evaluación Regresión Logística")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

acc_lr = evaluator.setMetricName("accuracy").evaluate(predictions_lr)
print(f"Accuracy (LR): {acc_lr:.4f}")

f1_lr = evaluator.setMetricName("f1").evaluate(predictions_lr)
print(f"F1-Score (LR): {f1_lr:.4f}")

prec_lr = evaluator.setMetricName("precisionByLabel").setMetricLabel(0.0).evaluate(predictions_lr)
print(f"Precision (para Negativos) (LR): {prec_lr:.4f}")

recall_lr = evaluator.setMetricName("recallByLabel").setMetricLabel(1.0).evaluate(predictions_lr)
print(f"Recall (para Positivos) (LR): {recall_lr:.4f}")

print("\nMatriz de Confusión (LR):")
confusion_matrix_lr = predictions_lr.groupBy("label", "prediction").count().orderBy("label", "prediction")
confusion_matrix_lr.show()

print("\n>> Evaluación Random Forest")

acc_rf = evaluator.setMetricName("accuracy").evaluate(predictions_rf)
print(f"Accuracy (RF): {acc_rf:.4f}")

f1_rf = evaluator.setMetricName("f1").evaluate(predictions_rf)
print(f"F1-Score (RF): {f1_rf:.4f}")

prec_rf = evaluator.setMetricName("precisionByLabel").setMetricLabel(0.0).evaluate(predictions_rf)
print(f"Precision (para Negativos) (RF): {prec_rf:.4f}")

recall_rf = evaluator.setMetricName("recallByLabel").setMetricLabel(1.0).evaluate(predictions_rf)
print(f"Recall (para Positivos) (RF): {recall_rf:.4f}")

print("\nMatriz de Confusión (RF):")
confusion_matrix_rf = predictions_rf.groupBy("label", "prediction").count().orderBy("label", "prediction")
confusion_matrix_rf.show()

>> Evaluación Regresión Logística
Accuracy (LR): 0.8814
F1-Score (LR): 0.8525
Precision (para Negativos) (LR): 0.6397
Recall (para Positivos) (LR): 0.9838

Matriz de Confusión (LR):
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  0.0|       0.0|  9625|
|  0.0|       1.0| 40234|
|  1.0|       0.0|  5420|
|  1.0|       1.0|329689|
+-----+----------+------+


>> Evaluación Random Forest
Accuracy (RF): 0.8350
F1-Score (RF): 0.8301
Precision (para Negativos) (RF): 0.3426
Recall (para Positivos) (RF): 0.9149

Matriz de Confusión (RF):
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
|  0.0|       0.0| 14864|
|  0.0|       1.0| 34995|
|  1.0|       0.0| 28525|
|  1.0|       1.0|306584|
+-----+----------+------+



## 5. Visualización

### 5.1. Exportación de datos

In [28]:
def aggregate_model_performance(predictions_df, model_name, group_by_col):

    df_agg = predictions_df.groupBy(group_by_col) \
        .agg(
            count("*").alias("total_de_reseñas"),
            sum(when(col("label") == 1.0, 1).otherwise(0)).alias("positivo_real"),
            sum(when(col("label") == 0.0, 1).otherwise(0)).alias("negativo_real"),
            sum(when(col("prediction") == 1.0, 1).otherwise(0)).alias("positivo_predicho"),
            sum(when(col("prediction") == 0.0, 1).otherwise(0)).alias("negativo_predicho")
        ) \
        .withColumn("model_name", lit(model_name))

    return df_agg

agg_lr_cat = aggregate_model_performance(predictions_lr, "Regresión Logística", "category")
agg_rf_cat = aggregate_model_performance(predictions_rf, "Random Forest", "category")

df_comparacion_categoria = agg_lr_cat.union(agg_rf_cat)

agg_lr_auth = aggregate_model_performance(predictions_lr, "Regresión Logística", "first_author")
agg_rf_auth = aggregate_model_performance(predictions_rf, "Random Forest", "first_author")

df_comparacion_autor = agg_lr_auth.union(agg_rf_auth)

print("Exportando CSVs para Tableau...")
path_cat = "/content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/tableau_category.csv"
path_auth = "/content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/tableau_autor.csv"

# Exportar directamente a CSV usando PySpark
df_comparacion_categoria.coalesce(1).write.csv(path_cat, header=True, mode="overwrite")
print(f"Archivo de Categorías guardado en: {path_cat}")

df_comparacion_autor.coalesce(1).write.csv(path_auth, header=True, mode="overwrite")
print(f"Archivo de Autores guardado en: {path_auth}")

Exportando CSVs para Tableau...
Archivo de Categorías guardado en: /content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/tableau_category.csv
Archivo de Autores guardado en: /content/gdrive/MyDrive/AI_data_set/AmazonBooksReviews/tableau_autor.csv


## Conclusiones


## Referencias

1.  Bekheet, M. (2022, septiembre 13). *Amazon Books Reviews*. https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews/data
2. Sharma, S. (2023, diciembre 28). *Amazon Books Review (EDA + Sentiment-Analysis)*. https://www.kaggle.com/code/shubham2703/amazon-books-review-eda-sentiment-analysis

