Este notebook construye una muestra representativa de la población lectora con base en variables clave del dataset `books_rating`

A continuación se describen las variables seleccionadas como representativas del comportamiento de la población:

| Variable            | Dominio                          | Estadísticas conocidas                           | Comentarios adicionales |
|---------------------|-----------------------------|--------------------------------|---------------------------------------------|
| `review/score`      | [1.0 – 5.0]                 | Moda: 5.0, Media aprox: 4.2    | Escala de satisfaccion                      |
| `review/helpfulness`| [0/0 – 10/10]               | Rango típico 1–10              | Requiere limpieza y transformación a entero |
| `user_id`           | IDs únicos por usuario      | Frecuencia variable            | Clasifica por volumen de participación      |


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split, count
from pyspark.sql.types import FloatType, IntegerType

spark = SparkSession.builder.appName('MuestreoBooksRating').getOrCreate()

# Cargar dataset
df = spark.read.option('header', 'true').csv('Books_rating.csv')
df.printSchema()
df.select('review/score', 'review/helpfulness', 'user_id').show(5) ##Ejemplo de variables de participacion

root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

+------------+------------------+--------------+
|review/score|review/helpfulness|       user_id|
+------------+------------------+--------------+
|         4.0|               7/7| AVCGYZL8FQQTD|
|         5.0|             10/10|A30TK6U7DNS82R|
|         5.0|             10/11|A3UH4UZ4RSVO82|
|         4.0|               7/7|A2MVUWT453QH61|
|         4.0|               3/3|A22X4XUPKF66MR|
+------------+------------------+--------------+
only showing top 5 rows



In [2]:
# Remover advertencias
spark.sparkContext.setLogLevel("ERROR")

In [3]:
# Limpieza de datos 
df_clean = df.filter((col("review/score").isNotNull()) & 
                     (col("review/helpfulness").isNotNull()) & 
                     (col("user_id").isNotNull()))

In [4]:
df = df_clean

In [5]:
# Convertir columnas relevantes a int
df = df.withColumn("Score_num", col("review/score").cast(FloatType()))
df = df.withColumn("Helpfulness_num", split(col("review/helpfulness"), "/")[0].cast(IntegerType()))

# Clasificar score
df = df.withColumn("score_group", when(col("Score_num") >= 4, "Alta").otherwise("Baja"))
# Clasificar helpfulness
df = df.withColumn("helpfulness_group", when(col("Helpfulness_num") >= 8, "Alta").otherwise("Baja"))

In [6]:
# Remover valores no convertidos correctamente
df = df.filter((col("Score_num").isNotNull()) & (col("Helpfulness_num").isNotNull()))

In [7]:
# Calcular cantidad de reseñas por usuario
user_reviews = df.groupBy("user_id").agg(count("user_id").alias("review_count"))
df = df.join(user_reviews, on="user_id", how="left")


In [8]:
# Clasificación más detallada de usuarios según frecuencia de participación
df = df.withColumn(
    "user_group_detailed",
    when(col("review_count") >= 20, "20+ reseñas")
    .when((col("review_count") >= 10) & (col("review_count") < 20), "10-19 reseñas")
    .when((col("review_count") >= 5) & (col("review_count") < 10), "5-9 reseñas")
    .when((col("review_count") >= 2) & (col("review_count") < 5), "2-4 reseñas")
    .otherwise("1 reseña")
)

In [9]:
# Mostrar distribución con mejor formato
from pyspark.sql.functions import format_number
distribution = df.groupBy("user_group_detailed").count().orderBy("count", ascending=False)
distribution = distribution.withColumn("count", format_number(col("count"), 0))
distribution.show(truncate=False, n=100)

+-------------------+-------+
|user_group_detailed|count  |
+-------------------+-------+
|1 reseña           |690,877|
|2-4 reseñas        |580,802|
|20+ reseñas        |540,951|
|5-9 reseñas        |338,868|
|10-19 reseñas      |268,931|
+-------------------+-------+



In [10]:
# Revisión de distribución conjunta de variables de caracterización
df.groupBy("score_group", "helpfulness_group", "user_group_detailed").count().orderBy("count", ascending=False).show()

+-----------+-----------------+-------------------+------+
|score_group|helpfulness_group|user_group_detailed| count|
+-----------+-----------------+-------------------+------+
|       Alta|             Baja|           1 reseña|482065|
|       Alta|             Baja|        2-4 reseñas|402632|
|       Alta|             Baja|        20+ reseñas|356614|
|       Alta|             Baja|        5-9 reseñas|243589|
|       Alta|             Baja|      10-19 reseñas|190730|
|       Baja|             Baja|           1 reseña|104137|
|       Baja|             Baja|        2-4 reseñas| 97797|
|       Alta|             Alta|        20+ reseñas| 85625|
|       Baja|             Baja|        20+ reseñas| 78467|
|       Alta|             Alta|           1 reseña| 72828|
|       Baja|             Baja|        5-9 reseñas| 53827|
|       Alta|             Alta|        2-4 reseñas| 53819|
|       Baja|             Baja|      10-19 reseñas| 44771|
|       Baja|             Alta|           1 reseña| 3184

In [11]:
# Cálculo de ocurrencias por combinación
comb_counts = df.groupBy("score_group", "helpfulness_group", "user_group_detailed").count()
total = df.count()
comb_probs = comb_counts.withColumn("probabilidad", (col("count") / total))
comb_probs.orderBy(col("probabilidad").desc()).show(truncate=False)

+-----------+-----------------+-------------------+------+--------------------+
|score_group|helpfulness_group|user_group_detailed|count |probabilidad        |
+-----------+-----------------+-------------------+------+--------------------+
|Alta       |Baja             |1 reseña           |482065|0.1991651066815015  |
|Alta       |Baja             |2-4 reseñas        |402632|0.1663473706520621  |
|Alta       |Baja             |20+ reseñas        |356614|0.14733503854068844 |
|Alta       |Baja             |5-9 reseñas        |243589|0.1006387710608326  |
|Alta       |Baja             |10-19 reseñas      |190730|0.07880008048160057 |
|Baja       |Baja             |1 reseña           |104137|0.04302419116611146 |
|Baja       |Baja             |2-4 reseñas        |97797 |0.0404048207982965  |
|Alta       |Alta             |20+ reseñas        |85625 |0.035375960212012   |
|Baja       |Baja             |20+ reseñas        |78467 |0.03241863322576287 |
|Alta       |Alta             |1 reseña 

In [12]:
# Muestreo por todas las combinaciones posibles (2x2x7 = 28 combinaciones)
from itertools import product
selected_columns = ["user_id", "review/helpfulness", "review/score", "review/summary", "Score_num", "user_group_detailed"]

for s, h in product(["Alta", "Baja"], repeat=2):
    for u in ["20+ reseñas", "10-19 reseñas", "5-9 reseñas", "2-4 reseñas", "1 reseña"]:
        subset = df.filter((col("score_group") == s) &
                           (col("helpfulness_group") == h) &
                           (col("user_group_detailed") == u))
        count_subset = subset.count()
        print(f"Muestra: Score={s}, Helpfulness={h}, Usuario={u}, Registros={count_subset}")
        muestra = subset.select(*selected_columns).sample(False, 0.1, seed=42)
        muestra.show(5, truncate=False)

Muestra: Score=Alta, Helpfulness=Alta, Usuario=20+ reseñas, Registros=85625
+--------------+------------------+------------+------------------------------------------+---------+-------------------+
|user_id       |review/helpfulness|review/score|review/summary                            |Score_num|user_group_detailed|
+--------------+------------------+------------+------------------------------------------+---------+-------------------+
|A1075MZNVRMSEO|17/19             |4.0         |A great book about destructive temptations|4.0      |20+ reseñas        |
|A1075MZNVRMSEO|24/24             |5.0         |A great mid-size dictionary               |5.0      |20+ reseñas        |
|A1075MZNVRMSEO|10/10             |5.0         |A wonderful textbook for serious students.|5.0      |20+ reseñas        |
|A10T0OW97SFBB |70/70             |5.0         |Amazing analysis of loves                 |5.0      |20+ reseñas        |
|A10T0OW97SFBB |8/8               |5.0         |Simply amazing        

**Técnica aplicada**: *Muestreo estratificado* sobre combinaciones de score, utilidad y tipo de usuario.

Se utiliza esta técnica porque permite garantizar que cada subgrupo esté representado proporcionalmente en la muestra.

**Justificación:**
- Los usuarios muy activos pueden dominar el conjunto si no se estratifica.
- Las reseñas útiles y bien puntuadas tienen mayor influencia sobre recomendaciones.
- El análisis cruzado de estas variables representa diferentes comportamientos del lector.

# Introducción
**El aprendizaje supervisado** consiste en entrenar modelos con datos etiquetados para predecir una variable de interés, se aplica a conjuntos de datos sin etiquetas , es decir, solo se disponen de las variables independientes. La finalidad es que el modelo aprenda una función que, dado un conjunto de características, predice la variable 
Ventajas: No requiere datos etiquetados , lo cual reduce el costo de recopilación 
útil para la recopilación de datos, identificación de segmentos, y descubrimiento de patrones
Ejemplos: regresión lineal, árboles de decisión.

**El aprendizaje no supervisado** involucra datos sin etiquetas y busca detectar patrones o estructuras, como agrupamientos (clustering). Ejemplos: KMeans, PCA. El objetivo principal es descubrir patrones , estructuras y agrupamientos de interés , agrupamientos inherentes a los datos , sin ninguna referencia previa sobre que esperar.
Ventajas:Permite obtener predicciones con un nivel de confianza definido.
Es útil en tareas donde la respuesta conocida está disponible durante el entrenamiento.

PySpark MLlib ofrece algoritmos de ambos tipos, como `LinearRegression` para supervisado y `KMeans` para no supervisado.


In [13]:
from itertools import product

# Definir las categorías para muestrear
score_categories = ["Alta", "Baja"]
helpfulness_categories = ["Alta", "Baja"]
user_groups = ["20+ reseñas", "10-19 reseñas", "5-9 reseñas", "2-4 reseñas", "1 reseña"]

# Acumular muestras
muestra_m = None
sample_fraction = 0.1

for s, h, u in product(score_categories, helpfulness_categories, user_groups):
    subset = df.filter(
        (col("score_group") == s) &
        (col("helpfulness_group") == h) &
        (col("user_group_detailed") == u)
    )
    # Puedes ajustar la fracción según tamaño
    muestra_parcial = subset.sample(withReplacement=False, fraction=sample_fraction, seed=42)
    if muestra_m is None:
        muestra_m = muestra_parcial
    else:
        muestra_m = muestra_m.union(muestra_parcial)
muestra_m.cache()


DataFrame[User_id: string, Id: string, Title: string, Price: string, profileName: string, review/helpfulness: string, review/score: string, review/time: string, review/summary: string, review/text: string, Score_num: float, Helpfulness_num: int, score_group: string, helpfulness_group: string, review_count: bigint, user_group_detailed: string]

## Preparacion de datos 

In [14]:
# Eliminamos registros con nulos en variables clave
df_clean = muestra_m.filter(
    (col("Score_num").isNotNull()) &
    (col("Helpfulness_num").isNotNull())
)

# Convertimos a tipos adecuados
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Ejemplo: indexar variables categóricas
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep')
    for column in ["score_group", "helpfulness_group", "user_group_detailed"]
]

# Componer los features
assembler = VectorAssembler(
    inputCols=["Score_num", "Helpfulness_num"] + [col+"_index" for col in ["score_group", "helpfulness_group", "user_group_detailed"]],
    outputCol="features"
)

# Crear pipeline para transformación
pipeline = Pipeline(stages=indexers + [assembler])
model = pipeline.fit(df_clean)
df_prepared = model.transform(df_clean)


## Division de entrenamiento y prueba 

In [15]:
# División 80-20
train_data, test_data = df_prepared.randomSplit([0.8, 0.2], seed=42)


## Construcción de modelos 

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Variable objetivo: por ejemplo
dt = DecisionTreeClassifier(labelCol="score_group_index", featuresCol="features", maxDepth=5)

# Entrenamiento
model_dt = dt.fit(train_data)

# Predicciones
predictions = model_dt.transform(test_data)

# Evaluación
evaluator = MulticlassClassificationEvaluator(
    labelCol="score_group_index", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Precisión del Árbol de Decisión: {accuracy}")


Precisión del Árbol de Decisión: 1.0


In [19]:
# Si la columna ya existe, elimínala antes de re-fitar
if 'score_group_index' in df_prepared.columns:
    df_prepared = df_prepared.drop('score_group_index')

# Luego ajusta y transforma
indexer_label = StringIndexer(inputCol='score_group', outputCol='score_group_index')
df_final = indexer_label.fit(df_prepared).transform(df_prepared)



In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

## Elimina la columna 'features' si ya existe
if 'features' in df_final.columns:
    df_final = df_final.drop('features')

# Luego, crea el VectorAssembler
assembler = VectorAssembler(
    inputCols=["Score_num", "Helpfulness_num"],
    outputCol="features"
)

# Aplicar la transformación
df_kmeans = assembler.transform(df_final)

# Definir número de clusters, por ejemplo, 3
k = 3
kmeans = KMeans(featuresCol='features', predictionCol='prediction', k=k, seed=42)

# Entrenar el modelo
model_kmeans = kmeans.fit(df_kmeans)

# Obtener las predicciones (clusters asignados)
predictions = model_kmeans.transform(df_kmeans)

# Evaluar la cohesión de los clusters con Silhouette
evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette')
silhouette_score = evaluator.evaluate(predictions)
print(f"Score de Silueta para K={k}: {silhouette_score}")

# Mostrar algunos miembros de los clusters
predictions.select("Score_num", "Helpfulness_num", "prediction").show(10)
