#**Amazon Book Reviews**

El dataset a utilizar durante el proyecto es ‘Amazon Book Reviews’ proveniente de la
plataforma Kaggle, se encuentra disponible en el siguiente enlace:
https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews/data.

Este conjunto de datos fue construido a partir de dos fuentes principales: la primera
fuente es el repositorio de datos de Amazon de la Universidad de California en San
Diego (UCSD),  La segunda fuente corresponde a datos obtenidos a través de la
API de Google Books.

En cuanto al tamaño global, el dataset se compone de dos archivos principales:
books_data.csv, con un tamaño aproximado de 181.3 MB, y books_rating.csv, con un
tamaño aproximado de 2.86 GB

Los datasets books_data y books_rating ofrecen información complementaria sobre
libros disponibles en Amazon y las reseñas realizadas por los usuarios. A continuación
se presenta una descripción general de cada uno de los datasets:

**books_data.csv (Metadatos de los libros):**
Este dataset contiene metadatos como título, autor, categoría, número de valoraciones,
descripción, fecha de publicación. En cuanto a su estructura cuenta con 212,404
registros y 10 columnas.

**books_rating.csv (Reseñas de Usuarios):** Este dataset tiene un enfoque en las opiniones de los usuarios que incluyen:
puntuaciones, reseñas, ID de usuario y nombre de perfil. En cuanto a su estructura
cuenta con 3,000,000 de registros distribuidos en 10 columnas


# **Proyecto | lectura, escritura, archivos de Big Data PySpark**

## **CARGA DE ARCHIVOS**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, to_date
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.appName("AmazonBooksEDA").getOrCreate()

# Carga de archivos
books_df = spark.read.option("header", True).option("inferSchema", True).csv("books_data.csv")
ratings_df = spark.read.option("header", True).option("inferSchema", True).csv("Books_rating.csv")



In [3]:
print("books_data.csv:")
books_df.show(5)
books_df.printSchema()

print("\nBooks_rating.csv:")
ratings_df.show(5)
ratings_df.printSchema()


books_data.csv:
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+------------+
|               Title|         description|             authors|               image|         previewLink|           publisher| publishedDate|            infoLink|          categories|ratingsCount|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+--------------------+------------+
|Its Only Art If I...|                NULL|    ['Julie Strain']|http://books.goog...|http://books.goog...|                NULL|          1996|http://books.goog...|['Comics & Graphi...|        NULL|
|Dr. Seuss: Americ...|"Philip Nel takes...| like that of Lew...| has changed lang...| giving us new wo...| inspiring artist...|['Philip Nel']|http://books.goog...|http://books.goog...|   A&C B

In [4]:
books_df = books_df.withColumn("publishedDate", to_date(col("publishedDate"))) \
                   .withColumn("ratingsCount", col("ratingsCount").cast("integer"))

ratings_df = ratings_df.withColumn("review/score", col("review/score").cast(DoubleType()))

print("books_data.csv:")
books_df.printSchema()

print("\nBooks_rating.csv:")
ratings_df.printSchema()

books_data.csv:
root
 |-- Title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- image: string (nullable = true)
 |-- previewLink: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- publishedDate: date (nullable = true)
 |-- infoLink: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- ratingsCount: integer (nullable = true)


Books_rating.csv:
root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: double (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)



In [5]:
print("books_data.csv - filas:", books_df.count(), "columnas:", len(books_df.columns))
print("Books_rating.csv - filas:", ratings_df.count(), "columnas:", len(ratings_df.columns))


books_data.csv - filas: 212404 columnas: 10
Books_rating.csv - filas: 324662 columnas: 10


In [6]:
print("Valores nulos en books_data.csv:")
books_df.select([count(when(col(c).isNull(), c)).alias(c) for c in books_df.columns]).show()

print("Valores nulos en Books_rating.csv:")
ratings_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ratings_df.columns]).show()



Valores nulos en books_data.csv:
+-----+-----------+-------+-----+-----------+---------+-------------+--------+----------+------------+
|Title|description|authors|image|previewLink|publisher|publishedDate|infoLink|categories|ratingsCount|
+-----+-----------+-------+-----+-----------+---------+-------------+--------+----------+------------+
|    1|      68357|  31251|51191|      24055|    73130|        46999|   24301|     40524|      168972|
+-----+-----------+-------+-----+-----------+---------+-------------+--------+----------+------------+

Valores nulos en Books_rating.csv:
+---+-----+------+-------+-----------+------------------+------------+-----------+--------------+-----------+
| Id|Title| Price|User_id|profileName|review/helpfulness|review/score|review/time|review/summary|review/text|
+---+-----+------+-------+-----------+------------------+------------+-----------+--------------+-----------+
|  0|    1|269540|  62920|      62909|                20|        1894|          5|    

In [7]:
print("Estadísticas books_data.csv:")
books_df.describe().show()

print("Ratings únicos en books_data.csv:")
books_df.select("ratingsCount").distinct().show()

print("Estadísticas Books_rating.csv:")
ratings_df.describe().show()


Estadísticas books_data.csv:
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|summary|               Title|         description|             authors|               image|         previewLink|           publisher|            infoLink|          categories|      ratingsCount|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|  count|              212403|              144047|              181153|              161213|              188349|              139274|              188103|              171880|             43432|
|   mean|   3823.672941176471|  1.4285714285714286|              1578.4|              1184.0|            Infinity|             3734.75|   1989.054693877551|  1983.7334777898159| 56.31

# **Proyecto | Base de Datos de Big Data**

## **PARTICIONAMIENTO**

In [8]:
from pyspark.sql.functions import col

# Asegúrate de filtrar registros que no tengan nulos en estas dos columnas
df_clean = books_df.join(ratings_df, on="Title") \
    .filter((col("ratingsCount").isNotNull()) & (col("review/score").isNotNull()))


In [9]:
total = df_clean.count()
print(total)


157161


In [10]:
df_clean.select("ratingsCount","review/score").describe().show()

+-------+------------------+-----------------+
|summary|      ratingsCount|     review/score|
+-------+------------------+-----------------+
|  count|            157161|           157161|
|   mean|226.33159626115895|4.154484891289824|
| stddev| 701.4173054181018|1.242821744171521|
|    min|                 1|              1.0|
|    max|              3907|              5.0|
+-------+------------------+-----------------+



In [11]:
total = df_clean.count()

# A: ratingsCount (popularidad)
p_a = df_clean.filter(col("ratingsCount") <= 1000).count() / total
p_b = 1 - p_a

# B: review/score (percepción)
p_c = df_clean.filter(col("review/score") < 4.0).count() / total
p_d = 1 - p_c


In [12]:
#Probabilidades de Ocurrencia
print("A. a:", p_a) # Probabilidad de a (ratingsCount <= 1000)
print("B. b:", p_b) # Probabilidad de b (ratingsCount > 1000)
print("C. c:", p_c) # Probabilidad de c (review/score < 4.0)
print("D. d:", p_d) # Probabilidad de d (review/score >= 4.0)

A. a: 0.9315924434178963
B. b: 0.06840755658210373
C. c: 0.22082450480717225
D. d: 0.7791754951928278


In [13]:
# Probabilidades conjuntas
p_ac = p_a * p_c  # Probabilidad de a ∧ c (ratingsCount <= 1000 y review/score < 4.0)
p_ad = p_a * p_d  # Probabilidad de a ∧ d (ratingsCount <= 1000 y review/score >= 4.0)
p_bc = p_b * p_c  # Probabilidad de b ∧ c (ratingsCount > 1000 y review/score < 4.0)
p_bd = p_b * p_d  # Probabilidad de b ∧ d (ratingsCount > 1000 y review/score >= 4.0)


In [14]:
#Probabilidades de Ocurrencia
print("A. a ∧ c:", p_ac)
print("B. a ∧ d:", p_ad)
print("C. b ∧ c:", p_bc)
print("D. b ∧ d:", p_bd)


A. a ∧ c: 0.2057184399998606
B. a ∧ d: 0.7258740034180358
C. b ∧ c: 0.015106064807311673
D. b ∧ d: 0.05330149177479206


## **Extracción de submuestras a partir de las reglas de particionamiento generadas**

Combinación A → ratingsCount ≤ 1000 y score < 4.0

In [15]:
df_A = df_clean.filter((col("ratingsCount") <= 1000) & (col("review/score") < 4.0))
df_A.show(5)


+--------------------+-----------+---------------+-----+--------------------+---------+-------------+--------------------+-------------------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|               Title|description|        authors|image|         previewLink|publisher|publishedDate|            infoLink|         categories|ratingsCount|        Id|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+--------------------+-----------+---------------+-----+--------------------+---------+-------------+--------------------+-------------------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|"A Child Called "...|       NULL|['Dave Pelzer']| NULL|http://books.goog...|     NULL|   1995-01-01|http://books.g

Combinación B → ratingsCount ≤ 1000 y score ≥ 4.0

In [16]:
df_B = df_clean.filter((col("ratingsCount") <= 1000) & (col("review/score") >= 4.0))
df_B.show(5)


+--------------------+-----------+---------------+-----+--------------------+---------+-------------+--------------------+-------------------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|               Title|description|        authors|image|         previewLink|publisher|publishedDate|            infoLink|         categories|ratingsCount|        Id|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+--------------------+-----------+---------------+-----+--------------------+---------+-------------+--------------------+-------------------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|"A Child Called "...|       NULL|['Dave Pelzer']| NULL|http://books.goog...|     NULL|   1995-01-01|http://books.g

 Combinación C → ratingsCount > 1000 y score < 4.0

In [17]:
df_C = df_clean.filter((col("ratingsCount") > 1000) & (col("review/score") < 4.0))
df_C.show(5)


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+------------+----------+------+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|               Title|         description|             authors|               image|         previewLink|           publisher|publishedDate|            infoLink|          categories|ratingsCount|        Id| Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+--------------------+--------------------+------------+----------+------+--------------+--------------------+------------------+------------+-----------+--------------------+-----------

Combinación D → ratingsCount > 1000 y score ≥ 4.0

In [18]:
df_D = df_clean.filter((col("ratingsCount") > 1000) & (col("review/score") >= 4.0))
df_D.show(5)


+--------------------+--------------------+--------------------+-----------+-----------------+--------------------+-------------+--------------------+----------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|               Title|         description|             authors|      image|      previewLink|           publisher|publishedDate|            infoLink|categories|ratingsCount|        Id|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+--------------------+--------------------+--------------------+-----------+-----------------+--------------------+-------------+--------------------+----------+------------+----------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|Raymond Chandler:...|"Later Novels and...|"" in which Cha

# Actividad 3 - modelos supervizados y no supervizados

Parte 2 – Construcción de Modelos de Aprendizaje Supervisado y No Supervisado
	1.	Introducción teórica
Se empleará un modelo de aprendizaje supervisado (DecisionTreeClassifier) para predecir la percepción de calidad (review/score), y un modelo de aprendizaje no supervisado (KMeans) para agrupar libros según popularidad (ratingsCount) y antigüedad (publishedDate). DecisionTree es útil por su interpretabilidad y manejo de variables categóricas; KMeans permite identificar grupos homogéneos sin variable objetivo.
	2.	Selección de los datos
Se reutiliza la muestra M construida mediante técnicas de muestreo aplicadas a cada una de las cuatro particiones definidas en la etapa anterior (A, B, C, D). Esta muestra combina registros representativos de todos los perfiles identificados.
	3.	Preparación de los datos
•	Se eliminan valores nulos remanentes.
•	categories se transforma con StringIndexer y OneHotEncoder.
•	publishedDate se transforma a año numérico.
•	Se ensamblan las variables con VectorAssembler.
•	Para supervisado, review/score se binariza como label (1 si ≥4.0, 0 si <4.0).
•	Se normalizan los datos para KMeans si es necesario.

4.	Preparación del conjunto de entrenamiento y prueba

•	La muestra M se divide en 80% entrenamiento y 20% prueba usando randomSplit.
•	Se justifica esta proporción para balancear robustez del entrenamiento y validación representativa.

5.	Construcción de modelos de aprendizaje supervisado y no supervisado

•	Supervisado: Se entrena un DecisionTreeClassifier con las variables procesadas. Se evalúa con accuracy y F1-score sobre el conjunto de prueba.
•	No supervisado: Se entrena un KMeans con K=3 sobre las variables seleccionadas. Se evalúa usando ClusteringEvaluator y se analizan los centros para interpretar los grupos formados.
•	Se compara el rendimiento de ambos modelos y su utilidad en función del objetivo analítico.



1. Introducción teórica

En esta etapa se emplearán dos enfoques de aprendizaje automático. El primero es el aprendizaje supervisado, que consiste en entrenar un modelo con datos etiquetados para predecir una variable objetivo. Se usará el algoritmo DecisionTreeClassifier de PySpark, conocido por su capacidad de manejar tanto variables categóricas como numéricas y ofrecer interpretaciones claras mediante reglas de decisión. El segundo enfoque es el aprendizaje no supervisado, donde no existe una variable objetivo y el objetivo es descubrir patrones o estructuras ocultas en los datos. Para ello, se utilizará KMeans, un algoritmo de agrupamiento que permite identificar clústeres de libros similares según sus características, como popularidad y antigüedad. Ambos modelos se aplicarán sobre la misma muestra M, siguiendo las etapas de preparación, entrenamiento y evaluación.

2. Selección de los datos

Para esta etapa se utilizará la muestra M generada previamente a partir del particionamiento de la base de datos original. Esta muestra fue construida aplicando técnicas de muestreo específicas para cada combinación de popularidad (ratingsCount) y evaluación (review/score), garantizando representatividad y diversidad temática. La muestra M resultante combina registros de las cuatro particiones definidas (A, B, C, D), y constituye el conjunto base sobre el cual se entrenarán y evaluarán ambos modelos. Esta selección permite asegurar que los datos utilizados reflejan adecuadamente los distintos perfiles presentes en la población objetivo.

In [3]:

# IMPORTACIONES
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, when, udf
from pyspark.sql.types import StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# SESIÓN SPARK
spark = SparkSession.builder.appName("MLlibEquipo52").getOrCreate()

# CARGA Y UNIÓN DE DATOS
books = spark.read.csv("books_data.csv", header=True, inferSchema=True)
ratings = spark.read.csv("Books_rating.csv", header=True, inferSchema=True)

books = books.withColumnRenamed("title", "Title")
df = ratings.join(books.select("Title", "categories", "publishedDate", "ratingsCount"), on="Title", how="inner")

# FILTRADO DE NULOS PARA VARIABLES CLAVE
df_clean = df.filter(col("ratingsCount").isNotNull() & col("review/score").isNotNull())

# PARTICIONAMIENTO SEGÚN CRITERIOS
df_clean = df_clean.withColumn("ratings_partition", when(col("ratingsCount") <= 1000, "baja").otherwise("alta"))
df_clean = df_clean.withColumn("score_partition", when(col("review/score") < 4.0, "baja").otherwise("alta"))

part_A = df_clean.filter((col("ratings_partition") == "baja") & (col("score_partition") == "baja"))
part_B = df_clean.filter((col("ratings_partition") == "baja") & (col("score_partition") == "alta"))
part_C = df_clean.filter((col("ratings_partition") == "alta") & (col("score_partition") == "baja"))
part_D = df_clean.filter((col("ratings_partition") == "alta") & (col("score_partition") == "alta"))


                                                                                

3. Preparación de los datos
Se transforma la variable categories con codificación categórica, se extrae el año de publishedDate, y se binariza review/score como variable objetivo para el modelo supervisado. Luego se ensamblan las variables predictoras en un solo vector y se normalizan para el modelo no supervisado. Este procesamiento es necesario para adaptar los datos a los requerimientos de los algoritmos de aprendizaje automático de PySpark.

In [18]:
# MUESTREO POR PARTICIÓN
sample_A = part_A.sample(False, 0.3, seed=42)
sample_B = part_B.sample(False, 0.05, seed=42)
sample_C = part_C.rdd.zipWithIndex().filter(lambda x: x[1] % 5 == 0).map(lambda x: x[0]).toDF()
sample_D = part_D

muestra_M = sample_A.union(sample_B).union(sample_C).union(sample_D)

In [27]:
# TRANSFORMACIONES PREVIAS
muestra_M = muestra_M.withColumn("year", year(col("publishedDate")))

# LIMPIEZA Y CONVERSIÓN DE CATEGORIES A STRINGS SIMPLES
to_string_udf = udf(lambda x: str(x).strip("[]'").split(",")[0] if x else None, StringType())
muestra_M = muestra_M.withColumn("categories", to_string_udf("categories"))
muestra_M = muestra_M.filter(col("categories").isNotNull())

# CONVERSIÓN DE ratingsCount A TIPO NUMÉRICO
muestra_M = muestra_M.withColumn("ratingsCount", col("ratingsCount").cast("double"))

# BINARIZACIÓN DE LABEL
muestra_M = muestra_M.withColumn("label", when(col("review/score") >= 4.0, 1).otherwise(0))


In [28]:
# ELIMINAR NULOS EN COLUMNAS EXISTENTES
muestra_M = muestra_M.dropna(subset=["ratingsCount", "year", "categories"])

# PIPELINE DE TRANSFORMACIÓN
indexer = StringIndexer(inputCol="categories", outputCol="categories_index", handleInvalid="keep")
encoder = OneHotEncoder(inputCol="categories_index", outputCol="categories_encoded", handleInvalid="keep")
assembler = VectorAssembler(inputCols=["ratingsCount", "year", "categories_encoded"], outputCol="features_raw")
scaler = MinMaxScaler(inputCol="features_raw", outputCol="features")

pipeline = Pipeline(stages=[indexer, encoder, assembler, scaler])
model_prep = pipeline.fit(muestra_M)
muestra_final = model_prep.transform(muestra_M)

4. Preparación del conjunto de entrenamiento y prueba
La muestra M ya transformada (muestra_final) se divide en dos subconjuntos: 80% para entrenamiento y 20% para prueba. Esta división permite entrenar los modelos en datos representativos y evaluarlos con información nueva, asegurando una medición objetiva del desempeño.


In [29]:
# DIVISIÓN TRAIN/TEST
train_data, test_data = muestra_final.randomSplit([0.8, 0.2], seed=42)

# CONVERSIÓN DE LABEL A DOUBLE
train_data = train_data.withColumn("label", col("label").cast("double"))
test_data = test_data.withColumn("label", col("label").cast("double"))



5. Construcción de modelos – Parte 1: Modelo Supervisado (DecisionTreeClassifier)

Se entrena un clasificador de árbol de decisión para predecir la variable binaria label (percepción de calidad). Se ajusta el modelo con los datos de entrenamiento y se evalúa su rendimiento en el conjunto de prueba mediante métricas de precisión y F1-score.



In [30]:
# MODELO SUPERVISADO
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_data)
predictions = dt_model.transform(test_data)

evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

print("Accuracy:", evaluator_acc.evaluate(predictions))
print("F1-score:", evaluator_f1.evaluate(predictions))



Accuracy: 0.7718696018071489
F1-score: 0.7767987467796413


5. Construcción de modelos – Parte 2: Modelo No Supervisado (KMeans)

Se aplica KMeans para agrupar libros según características cuantitativas como popularidad y antigüedad. Se fija un valor de K=3 para obtener tres clústeres representativos. Luego se evalúa la cohesión de los grupos usando el coeficiente de silueta.


In [None]:
# MODELO NO SUPERVISADO
kmeans = KMeans(featuresCol="features", k=10, seed=42)
kmeans_model = kmeans.fit(muestra_final)
clusters = kmeans_model.transform(muestra_final)

evaluator_cluster = ClusteringEvaluator(featuresCol="features", metricName="silhouette", distanceMeasure="squaredEuclidean")
print("Silhouette score:", evaluator_cluster.evaluate(clusters))
clusters.groupBy("prediction").count().show()