# Muestreo Estratificado de Reseñas de Libros de Amazon
Este cuaderno realiza una segmentación y muestreo estratificado de la base de datos de reseñas de libros. Se construyen particiones con base en puntuación y género, y se documenta el enfoque seguido para generar una muestra representativa.

In [1]:
# Inicializar Spark
import findspark, os, re
findspark.init()
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .master('local[*]') \
    .config('spark.driver.memory', '16g') \
    .appName('AmazonBooks-Sampling') \
    .getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', 'true')

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("mohamedbakhet/amazon-books-reviews")

In [3]:
reviews = spark.read.csv(path+"/Books_rating.csv",
                    header=True,
                    inferSchema=True,
                    sep=",",
                    quote='"',        # <- Maneja correctamente los textos entre comillas
                    escape='"'        # <- (opcional) Escapa comillas internas si las hubiera
                    )
books = spark.read.csv(path+"/books_data.csv",
                    header=True,
                    inferSchema=True,
                    sep=",",
                    quote='"',        # <- Maneja correctamente los textos entre comillas
                    escape='"'        # <- (opcional) Escapa comillas internas si las hubiera
                    )
full_data = reviews.join(books, on='Title', how='left')

## 1. Caracterización de la población
Variables claves analizadas en la base de datos:

| Variable | Dominio / Rango típico | Estadísticas |
|----------|------------------------|--------------|
| review/score | {1,2,3,4,5} | μ≈4.22 ; σ≈1.20 ; min 1 ; max 5 |
| categories| 10964 categorias distintas | Top 10 cubre 90.23% de los datos |
| price | 1 - 995 USD | μ≈21.76 ; σ≈26.20 ; min 1 ; max 995 |
| description | 0-15000 caracteres | μ≈1.42k tokens ; σ≈0.97k tokens |
| publishedDate | 101-2020+ (sin limpiar) | μ≈1983.8  ; σ≈32.5 |
| ratingsCount | 1-4895+ |μ ≈ 21.25 ; σ ≈ 201.34 |

In [4]:
import pandas as pd
cols = [
    "review/score",
    "price",
    "publishedDate",
    "ratingsCount",
]

In [5]:
# 1) Usando describe() — devuelve count, mean, stddev, min, max
desc_df = full_data.select(cols).describe().toPandas()
desc_transposed = desc_df.set_index('summary').T.drop('count', axis=1)

In [6]:
print(desc_transposed)

summary                      mean              stddev        min     max
review/score    4.215289333333334  1.2030537472334044        1.0     5.0
price            21.7626558749334  26.206540521370123        1.0   995.0
publishedDate  1985.0539203308863   38.03038573196446  101-01-01    20??
ratingsCount   272.09905936069714   788.8162145564309        1.0  4895.0


In [7]:
cols = [
    "publishedDate",
    "ratingsCount",
]

In [8]:
full_data = full_data \
    .withColumn('desc_len_chars', F.length(F.col('description'))) \
    .withColumn('desc_len_tokens', F.size(F.split(F.col('description'), r'\s+')))

stats_desc = full_data.agg(
    F.count('description').alias('count'),
    F.min('desc_len_chars').alias('min_chars'),
    F.max('desc_len_chars').alias('max_chars'),
    F.mean('desc_len_chars').alias('mean_chars'),
    F.stddev('desc_len_chars').alias('stddev_chars'),
    F.mean('desc_len_tokens').alias('mean_tokens'),
    F.stddev('desc_len_tokens').alias('stddev_tokens')
)
stats_desc.show()

+-------+---------+---------+----------------+-----------------+-----------------+------------------+
|  count|min_chars|max_chars|      mean_chars|     stddev_chars|      mean_tokens|     stddev_tokens|
+-------+---------+---------+----------------+-----------------+-----------------+------------------+
|2359775|        1|    26092|722.179402697291|630.2005518026858|91.85404533333333|103.08612464092845|
+-------+---------+---------+----------------+-----------------+-----------------+------------------+



## 2. Diseño de particiones
Las particiones se crean a partir del producto cruzado entre:
- **review_score_group**: Low (1–2), Mid (3), High (4–5)
- **categories_group**: Fiction / Non-Fiction

### Probabilidades empíricas por combinación:


In [9]:
from pyspark.sql.functions import col, when, regexp_replace, split, explode

# 1) Definir R_group
full_data = full_data.withColumn(
    "R_group",
    when(col("review/score") <= 2, "Low")
    .when(col("review/score") == 3, "Mid")
    .otherwise("High")
)

# 2) Definir C_group limpiando y usando la primera etiqueta
#    (asume formato ["Cat1","Cat2",…])
full_data = full_data.withColumn(
    "first_cat",
    split(regexp_replace(col("categories"), r"^\[|\]$", ""), r",\s*")[0]
)
full_data = full_data.withColumn(
    "C_group",
    when(col("first_cat").contains("fiction"), "Fiction").otherwise("Non‑Fiction")
).drop("first_cat")

# 3) Ver probabilidades empíricas de cada combinación
freqs = (
    full_data
    .groupBy("R_group", "C_group")
    .count()
    .withColumnRenamed("count", "N")
    .withColumn("Pr", (col("N") / full_data.count()).cast("double"))
    .orderBy("R_group", "C_group")
)
freqs.show(truncate=False)


+-------+-----------+-------+---------------------+
|R_group|C_group    |N      |Pr                   |
+-------+-----------+-------+---------------------+
|High   |Fiction    |38670  |0.01289              |
|High   |Non‑Fiction|2354289|0.784763             |
|Low    |Fiction    |3736   |0.0012453333333333333|
|Low    |Non‑Fiction|349010 |0.11633666666666667  |
|Mid    |Fiction    |3103   |0.0010343333333333333|
|Mid    |Non‑Fiction|251192 |0.08373066666666666  |
+-------+-----------+-------+---------------------+



In [10]:
from pyspark.sql.functions import col

# 1) Función auxiliar
def get_partition(df, r_group: str, c_group: str):
    return df.filter((col("R_group") == r_group) & (col("C_group") == c_group))

# 2) Generar un dict con todos los estratos
partitions = {}
for rg in ["Low", "Mid", "High"]:
    for cg in ["Fiction", "Non‑Fiction"]:
        key = f"{rg}×{cg}"
        partitions[key] = get_partition(full_data, rg, cg)

# 3) Ver conteos de cada partición
for key, df_part in partitions.items():
    print(f"{key}: {df_part.count():,} rows")

Low×Fiction: 3,736 rows
Low×Non‑Fiction: 349,010 rows
Mid×Fiction: 3,103 rows
Mid×Non‑Fiction: 251,192 rows
High×Fiction: 38,670 rows
High×Non‑Fiction: 2,354,289 rows


## 3. Técnica de muestreo
Se usará **muestreo aleatorio simple estratificado** (SRS) dentro de cada estrato (R_group x C_group) por estas razones:.

**Control de heterogeneidad**: cada combinación de sentimiento (Low/Mid/High) y género (Fiction/Non‑Fiction) se trata como una subpoblación homogénea.

**Minimización de sesgos**: al muestrear por estrato, evitamos que los grupos grandes dominen la muestra.

**Asignación tipo Neyman**: definimos fracciones mayores en estratos pequeños para garantizar un tamaño de muestra mínimo, y menores en estratos muy grandes para eficiencia.


### Fórmula de tamaño de muestra de Cochran

$$
n_0 = \frac{Z^2 \cdot p \cdot (1 - p)}{E^2}
$$

Donde:

- **n₀** = tamaño de muestra inicial para poblaciones grandes (antes de cualquier ajuste para poblaciones finitas)  
- **Z** = valor z (por ejemplo, 1.96 para un 95 % de confianza)  
- **p** = proporción poblacional estimada (usar 0.5 si se desconoce)  
- **E** = margen de error  

#### Ajuste del tamaño de la muestra
$$
n_{adj} = \frac{n_0}{1 + \frac{n_0 - 1}{N}}
$$

Donde:

- **nₐdj** = tamaño de muestra ajustado  
- **n₀** = tamaño de muestra inicial (calculado con la fórmula de Cochran u otras)  
- **N** = tamaño total de la población  


In [11]:
#Definición del tamaño de muestra
import math

Z = 2.56 # 99% de confianza
p = 0.5 # probabilidad de que el usuario le guste el libro
E = 0.01 # margen de error

n0 = (Z**2 * p * (1-p)) / (E**2)
print("Tamaño de muestra necesario:", n0)

N = full_data.count() # población total

n = n0 / (1 + ((n0 - 1) / N))
n = math.ceil(n)
print(f"Tamaño de muestra ajustado (población finita): {n}")

Tamaño de muestra necesario: 16384.0
Tamaño de muestra ajustado (población finita): 16296


### Fórmula para muestreo estratificado

El muestreo estratificado se utiliza cuando una población puede dividirse en subgrupos distintos, o estratos, como grupos de edad, género o nivel educativo. El tamaño de muestra para cada estrato puede determinarse mediante el uso de la asignación proporcional:

$$
n_h = \frac{N_h}{N} \cdot n
$$

Donde:

- **nₕ** = tamaño de muestra para el estrato h  
- **Nₕ** = tamaño de la población en el estrato h  
- **N** = tamaño total de la población  
- **n** = tamaño total de la muestra  

In [12]:
from pyspark.sql.functions import concat_ws, col
fraction = n / N


# 1) Definición de frecuencias para obtener como mínimo 50 muestras por estrato
fractions = {
    "Low×Fiction":      50/3756,
    "Low×Non‑Fiction":  fraction,
    "Mid×Fiction":      50/3103,
    "Mid×Non‑Fiction":  fraction,
    "High×Fiction":     fraction,
    "High×Non‑Fiction": fraction,
}

# 2) Creo la columna compuesta
full2 = full_data.withColumn(
    "stratum",
    concat_ws("×", col("R_group"), col("C_group"))
)

# 3) Muestreo estratificado usando esa columna
sample_df = full2.stat.sampleBy("stratum", fractions, seed=42)

# 4) Veo el conteo por estrato para validar
sample_df.groupBy("R_group", "C_group") \
         .count() \
         .orderBy("R_group", "C_group") \
         .show(truncate=False)



+-------+-----------+-----+
|R_group|C_group    |count|
+-------+-----------+-----+
|High   |Fiction    |219  |
|High   |Non‑Fiction|12849|
|Low    |Fiction    |37   |
|Low    |Non‑Fiction|1896 |
|Mid    |Fiction    |54   |
|Mid    |Non‑Fiction|1354 |
+-------+-----------+-----+



In [13]:
# Estadísticas exploratorias por grupo
sample_df.groupBy('R_group', 'C_group') \
    .agg(F.count('*').alias('N'),
         F.round(F.avg('review/score'),2).alias('MeanScore')) \
    .orderBy('R_group','C_group') \
    .show()

+-------+-----------+-----+---------+
|R_group|    C_group|    N|MeanScore|
+-------+-----------+-----+---------+
|   High|    Fiction|  219|     4.84|
|   High|Non‑Fiction|12849|     4.75|
|    Low|    Fiction|   37|     1.32|
|    Low|Non‑Fiction| 1896|     1.43|
|    Mid|    Fiction|   54|      3.0|
|    Mid|Non‑Fiction| 1354|      3.0|
+-------+-----------+-----+---------+



In [14]:
# 3) Verificación de probabilidades de cada combinación en la muestra
#    (deberían ser similares a las de la población)
#    (asume que la muestra es representativa)
freqs_Sample = (
    sample_df
    .groupBy("R_group", "C_group")
    .count()
    .withColumnRenamed("count", "N")
    .withColumn("Pr", (col("N") / sample_df.count()).cast("double"))
    .orderBy("R_group", "C_group")
)
freqs_Sample.show(truncate=False)

+-------+-----------+-----+--------------------+
|R_group|C_group    |N    |Pr                  |
+-------+-----------+-----+--------------------+
|High   |Fiction    |219  |0.013346334328722042|
|High   |Non‑Fiction|12849|0.7830458894509111  |
|Low    |Fiction    |37   |0.002254860137729295|
|Low    |Non‑Fiction|1896 |0.11554634651715522 |
|Mid    |Fiction    |54   |0.003290876957767079|
|Mid    |Non‑Fiction|1354 |0.08251569260771528 |
+-------+-----------+-----+--------------------+

