# Actividad 3 | Base de Datos de Big Data

## Informacion del Equipo o Persona

Alejandro González Almazán - A00517113

--------------------------------------------------------------------------------

## Introducción Teórica: Aprendizaje Supervisado, No Supervisado y Algoritmos en PySpark
### Aprendizaje Supervisado [1]
El aprendizaje supervisado es una subcategoría del aprendizaje automático y la inteligencia artificial que se caracteriza por utilizar conjuntos de datos etiquetados para entrenar algoritmos que puedan clasificar datos o predecir resultados con precisión. En este enfoque, el modelo ajusta sus ponderaciones hasta adaptarse adecuadamente a los datos de entrada, como parte del proceso de validación cruzada.

Los modelos de aprendizaje supervisado se construyen utilizando un conjunto de entrenamiento que incluye tanto las entradas como las salidas correctas, lo que permite que el modelo aprenda con el tiempo. El algoritmo mide su precisión a través de la función de pérdida, ajustándose hasta minimizar el error
#### Algoritmos representativos de aprendizaje supervisado:
1. *Regresión Logística:* Utilizada cuando la variable dependiente es categórica (resultados binarios como "sí/no"). Se emplea principalmente para resolver problemas de clasificación binaria.
2. *Regresión Lineal:* Identifica la relación entre variables dependientes e independientes, comúnmente usada para proyecciones como ingresos por ventas.
3. *Árboles de Decisión:* Algoritmo que aplica una estructura de árbol con reglas de decisión secuenciales sobre características de los datos
4. *Redes Neuronales:* Principalmente aprovechadas para algoritmos de aprendizaje profundo, procesan datos de entrenamiento imitando la interconectividad del cerebro humano

### Aprendizaje no supervisado [2]
El aprendizaje no supervisado es un tipo de aprendizaje automático que aprende de los datos sin supervisión humana. A diferencia del aprendizaje supervisado, los modelos reciben datos sin etiquetar y pueden descubrir patrones y estadísticas sin ninguna orientación explícita

Este enfoque es esencial en áreas como la biomedicina, ciberseguridad y minería de datos, donde la capacidad de descubrir patrones ocultos en grandes volúmenes de datos está transformando industrias enteras

#### Algoritmos representativos de aprendizaje supervisado: [3]
1. *Análisis de Componentes Principales (PCA):* Técnica de reducción de dimensionalidad que transforma datos a un espacio de menor dimensión preservando la mayor variabilidad posible.
2. *K-Means:* Algoritmo de clustering que divide datos en K grupos basados en su similitud. Versiones mejoradas como K-Means++ optimizan el proceso de inicialización.
3. *Modelos de Mezcla Gaussiana (GMM):* Más flexibles que K-Means, permiten que los datos se agrupen en formas más complejas


### Referencias
1. IBM. (s.f.). ¿Qué es el aprendizaje supervisado?. IBM. https://www.ibm.com/mx-es/topics/supervised-learning
En este caso, se utiliza "(s.f.)" para indicar que no se especifica una fecha de publicación en la fuente consultada.
2. Google Cloud. (s.f.). ¿Qué es el aprendizaje no supervisado?. https://cloud.google.com/discover/what-is-unsupervised-learning?hl=es-419
3. Redacción IT Masters Mag. (2024, 10 de septiembre). Aprendizaje no supervisado: Guía completa. IT Masters Mag. https://www.itmastersmag.com/transformacion-digital/aprendizaje-no-supervisado-que-es-y-como-funciona/


# Impotacion de Librerias

In [3]:
# PySpark
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
"""
# omitir para ejecutar de forma local
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
"""
#Librerias de codigo
import kagglehub


### Inicializar entorno PySpark

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Analisis_Steam") \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

spark

### Lectura de datos

In [5]:
file_path = kagglehub.dataset_download("najzeko/steam-reviews-2021")



In [6]:
print(file_path)

C:\Users\alexa\.cache\kagglehub\datasets\najzeko\steam-reviews-2021\versions\1


In [7]:
print(f"{file_path}/steam_reviews.csv")

C:\Users\alexa\.cache\kagglehub\datasets\najzeko\steam-reviews-2021\versions\1/steam_reviews.csv


In [8]:
df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("sep", ",").option("escape", "\"").csv(f"{file_path}/steam_reviews.csv")

### Muestra M: Selección de Particiones
Se eligieron 3 particiones que cubren:
1. *Inglés (pagado y recomendado)*: Caso ideal de éxito comercial.
2. *Español (gratuito y no recomendado)*: Caso de baja satisfacción.
3. *Chino (pagado y no recomendado)*: Caso de posible problema cultural/calidad.

*Muestreo*: 30% de cada una para balancear diversidad y rendimiento.

In [14]:
from pyspark.sql import functions as F
from itertools import product

# Reemplazar "." por "_" en todas las columnas
new_columns = [col_name.replace(".", "_") for col_name in df.columns]
df = df.toDF(*new_columns)

# 1. Filtrar los 5 idiomas principales
top_languages = ['english', 'schinese', 'russian', 'brazilian', 'spanish']
df_top_lang = df.filter(F.col("language").isin(top_languages))

# 2. Crear subconjuntos particionados (sin mostrar estadísticas)
df_subsets = {}
for lang, received, recommended in product(top_languages, [True, False], [True, False]):
    key = f"lang={lang}_free={received}_rec={recommended}"
    df_subsets[key] = df_top_lang.filter(
        (F.col("language") == lang) &
        (F.col("received_for_free") == received) &
        (F.col("recommended") == recommended)
    )

df_subsets

{'lang=english_free=True_rec=True': +----+------+--------------------+---------+--------+--------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------+-----------------+---------------------------+-----------------+----------------------+------------------+-----------------------+------------------------------+-------------------------+------------------+
 | _c0|app_id|            app_name|review_id|language|              review|timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|steam_purchase|received_for_free|written_during_early_access|   author_steamid|author_num_games_owned|author_num_reviews|author_playtime_forever|author_playtime_last_two_weeks|author_playtime_at_review|author_last_played|
 +----+------+--------------------+---------+--------+--------------------+-----------------+-----------------+-----------+-------------+-----------+------

In [15]:
from functools import reduce
from pyspark.sql import DataFrame

# Particiones seleccionadas estratégicamente (¡personaliza esta lista!)
particiones_clave = [
    "lang=english_free=False_rec=True",
    "lang=spanish_free=True_rec=False",
    "lang=schinese_free=False_rec=False"
]

# Muestreo del 30% de cada partición seleccionada y unión
samples_to_union = [
    df_subsets[key].sample(withReplacement=False, fraction=0.3, seed=42)
    for key in particiones_clave
]

sample_M = reduce(DataFrame.union, samples_to_union)
print(f"Tamaño de la muestra M: {sample_M.count()} registros")

Tamaño de la muestra M: 2757627 registros


## 1 . Preparación de Datos (Preprocesamiento de la Muestra M)
Vamos a limpiar y preparar la muestra M (sample_M) para que sea adecuada para los algoritmos de ML.
### Limpieza Básica
- **Valores nulos**: Se eliminaron filas con `null` en `recommended`, `author_playtime_forever` y `votes_helpful`.
- **Outliers**: Se truncaron valores de `votes_helpful` superiores a 1,000 para evitar sesgos.
- **Variables booleanas**: `recommended` se convirtió a numérico (1 = True, 0 = False).

In [17]:
from pyspark.sql.functions import col, when, count

# 1.1. Eliminar filas con valores nulos en columnas clave
sample_M_clean = sample_M.na.drop(subset=["recommended", "author_playtime_forever", "votes_helpful"])
print(f"Registros después de eliminar nulos: {sample_M_clean.count()}")

# 1.2. Manejar outliers en 'votes_helpful' (límite superior a 1,000)
sample_M_clean = sample_M_clean.withColumn(
    "votes_helpful",
    when(col("votes_helpful") > 1000, 1000).otherwise(col("votes_helpful"))
)

# 1.3. Convertir booleanos a numéricos (para modelos)
sample_M_clean = sample_M_clean.withColumn(
    "recommended_numeric",
    when(col("recommended") == True, 1).otherwise(0)
)

Registros después de eliminar nulos: 2757627


## 2. Transformación de Variables
- **Codificación**: `language` se convirtió a numérico (`StringIndexer`).
- **Normalización**: `author_playtime_forever` y `votes_helpful` se escalaron a rango [0, 1] con `MinMaxScaler`.
- **Pipeline**: Se usó un Pipeline para encadenar transformaciones y evitar data leakage.


In [18]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

# 2.1. Codificar 'language' (categórica a numérica)
indexer = StringIndexer(inputCol="language", outputCol="language_encoded")

# 2.2. Normalizar 'author_playtime_forever' y 'votes_helpful'
assembler = VectorAssembler(
    inputCols=["author_playtime_forever", "votes_helpful"],
    outputCol="features_raw"
)
scaler = MinMaxScaler(inputCol="features_raw", outputCol="features_scaled")

# Pipeline para aplicar todas las transformaciones
pipeline = Pipeline(stages=[indexer, assembler, scaler])
sample_M_preprocessed = pipeline.fit(sample_M_clean).transform(sample_M_clean)

# Mostrar resultado
sample_M_preprocessed.select("language", "language_encoded", "features_scaled").show(5)

+--------+----------------+--------------------+
|language|language_encoded|     features_scaled|
+--------+----------------+--------------------+
| english|             0.0|[0.00808640098829...|
| english|             0.0|[5.71930285511064...|
| english|             0.0|[0.00325930937858...|
| english|             0.0|[0.00285410543690...|
| english|             0.0|[0.01811563182525...|
+--------+----------------+--------------------+
only showing top 5 rows



## 3. Features Finales
- **Variables seleccionadas**:
  - `features`: Vector escalado (`author_playtime_forever` + `votes_helpful`).
  - `label`: Variable objetivo (`recommended_numeric`).
  - `language_encoded`: Idioma codificado (para análisis adicional).

In [19]:
    # 3. Columnas finales (features + target)
final_data = sample_M_preprocessed.select(
    col("features_scaled").alias("features"),
    col("recommended_numeric").alias("label"),
    col("language_encoded"),
    col("author_playtime_forever")
)
final_data.show(5)

+--------------------+-----+----------------+-----------------------+
|            features|label|language_encoded|author_playtime_forever|
+--------------------+-----+----------------+-----------------------+
|[0.00808640098829...|    1|             0.0|                23329.0|
|[5.71930285511064...|    1|             0.0|                  165.0|
|[0.00325930937858...|    1|             0.0|                 9403.0|
|[0.00285410543690...|    1|             0.0|                 8234.0|
|[0.01811563182525...|    1|             0.0|                52263.0|
+--------------------+-----+----------------+-----------------------+
only showing top 5 rows



### Resumen de Preparación de Datos
1. **Limpieza**:
- Eliminación de nulos en columnas clave.
- Control de outliers en `votes_helpful`.
2. **Transformaciones**:
- Normalización de features numéricas.
- Codificación de variables categóricas.
3. **Salida**:
- Dataset listo para modelos (`final_data`).
- Features: `features` (vector), `label` (target).

## Preparación del Conjunto de Entrenamiento y Prueba
### División Train/Test
- **Proporción**: 70-30 (óptima para balancear aprendizaje y evaluación).
- **Semilla**: `seed=42` para garantizar reproducibilidad.
- **Resultados**:
  - Entrenamiento: `X` registros.
  - Prueba: `Y` registros.


In [20]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Dividir los datos (70% entrenamiento, 30% prueba)
train, test = final_data.randomSplit([0.7, 0.3], seed=42)

print(f"Registros de entrenamiento: {train.count()}")
print(f"Registros de prueba: {test.count()}")

Registros de entrenamiento: 1930784
Registros de prueba: 826843


## Construcción de Modelos
### Modelo Supervisado (Random Forest)
- **Variable objetivo**: `recommended_numeric` (1 = recomendado, 0 = no recomendado).
- **Características**: `features` (`author_playtime_forever` + `votes_helpful` escalados).
- **Métrica**: Accuracy (precisión global).
- **Resultado**: `Accuracy = X%`.

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 2.1. Entrenar modelo
rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=10,
    seed=42
)
model_rf = rf.fit(train)

# 2.2. Predecir en test
predictions_rf = model_rf.transform(test)

# 2.3. Evaluar (Accuracy)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions_rf)
print(f"Accuracy del modelo: {accuracy * 100:.2f}%")

## Modelo No Supervisado (K-Means)
- **Características**: `features` (mismo vector que en el modelo supervisado).
- **Número de clusters**: `k=3` (asumiendo grupos naturales: positivos, negativos, neutrales).
- **Métrica**: Silhouette Score (rango [-1, 1]; valores cercanos a 1 indican clusters bien definidos).
- **Resultado**: `Silhouette = X.XXX`.

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# 3.1. Entrenar modelo (k=3 clusters)
kmeans = KMeans(
    featuresCol="features",
    k=3,
    seed=42
)
model_kmeans = kmeans.fit(train)

# 3.2. Predecir clusters
clusters = model_kmeans.transform(train)

# 3.3. Evaluar (Silhouette Score)
evaluator = ClusteringEvaluator(featuresCol="features")
silhouette = evaluator.evaluate(clusters)
print(f"Silhouette Score: {silhouette:.3f}")

# Mostrar distribución de clusters
clusters.groupBy("prediction").count().show()