In [None]:
import pandas as pd
import os

In [None]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("aljarah/xAPI-Edu-Data")

# print("Path to dataset files:", path)

In [None]:
df_path = r"C:\Users\Fhuan\.cache\kagglehub\datasets\aljarah\xAPI-Edu-Data\versions\6\xAPI-Edu-Data.csv"
df = pd.read_csv(df_path)
display(df)

In [None]:
df.columns

In [None]:
# Reemplazar valores vacíos por NaN
df.replace(["?", "NA", "None", ""], pd.NA, inplace=True)

# Imputar valores numéricos con la media
num_cols = ['raisedhands','VisITedResources','AnnouncementsView','Discussion']
df[num_cols] = df[num_cols].fillna(df[num_cols].mean())

# Imputar categóricos con la moda
cat_cols = [
    'gender','NationalITy','PlaceofBirth','StageID','GradeID','SectionID',
    'Topic','Semester','Relation','ParentAnsweringSurvey',
    'ParentschoolSatisfaction','StudentAbsenceDays','Class'
]
df[cat_cols] = df[cat_cols].fillna(df[cat_cols].mode().iloc[0])

# Resultado pandas
print("DataFrame procesado con Pandas:")
print(df.head())



# ================================
#     PREPROCESAMIENTO EN SPARK
# ================================
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
)

spark = SparkSession.builder.appName("Preprocessing").getOrCreate()

# Convertir pandas → Spark DataFrame
spark_df = spark.createDataFrame(df)

# --- 1. Codificación de variables categóricas ---
categorical_cols = [
    'gender','NationalITy','PlaceofBirth','StageID','GradeID','SectionID',
    'Topic','Semester','Relation','ParentAnsweringSurvey',
    'ParentschoolSatisfaction','StudentAbsenceDays','Class'
]

indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_idx", handleInvalid="keep")
    for col in categorical_cols
]

# One-Hot Encoder
encoders = [
    OneHotEncoder(inputCol=f"{c}_idx", outputCol=f"{c}_oh")
    for c in categorical_cols
]

# --- 2. Columnas numéricas ---
numeric_cols = ['raisedhands','VisITedResources','AnnouncementsView','Discussion']

# Ensamblar características
assembler = VectorAssembler(
    inputCols=[f"{c}_oh" for c in categorical_cols] + numeric_cols,
    outputCol="vector_features"
)

# --- 3. Normalización ---
scaler = MinMaxScaler(inputCol="vector_features", outputCol="scaled_features")


# Crear pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler])

# Entrenar y transformar
model = pipeline.fit(spark_df)
processed_spark_df = model.transform(spark_df)

print("=== DataFrame procesado en Spark ===")
processed_spark_df.select("scaled_features").show(truncate=False)
