In [1]:
from pyspark.sql import SparkSession

# Iniciar Sesión de Spark
spark = SparkSession.builder \
    .appName("RedditTextETL") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/17 07:40:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
file_path = "hdfs://namenode:8020/data/reddit_depression_dataset.csv"

In [3]:
df_spark = spark.read.csv(
    file_path,
    header=True,
    inferSchema=True,
    escape='"',       # Ayuda a manejar comillas dentro de los campos
    multiLine=True    
)

                                                                                

In [4]:
df_spark.printSchema()
df_spark.show(15, truncate=50)

root
 |-- _c0: integer (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- upvotes: integer (nullable = true)
 |-- created_utc: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- label: integer (nullable = true)

+-----+------------+--------------------------------------------------+--------------------------------------------------+-------+-----------+------------+-----+
|  _c0|   subreddit|                                             title|                                              body|upvotes|created_utc|num_comments|label|
+-----+------------+--------------------------------------------------+--------------------------------------------------+-------+-----------+------------+-----+
|47951|DeepThoughts|                            Deep thoughts underdog|Only when we start considering ourselves, the 9...|      4| 1405308909|        NULL|    0|
|47952|DeepThoughts|I like th

In [5]:
# --- Verificación de la Distribución de la Variable Objetivo 'label' ---
print("--- Analizando la distribución de clases en TODO el dataset cargado ---")

# .groupBy("label") agrupa todas las filas por el valor de la columna 'label'.
# .count() cuenta cuántas filas hay en cada grupo.
# .show() muestra el resultado.
class_distribution = df_spark.groupBy("label").count()

print("Distribución de la variable 'label':")
class_distribution.show()

--- Analizando la distribución de clases en TODO el dataset cargado ---
Distribución de la variable 'label':


[Stage 3:>                                                          (0 + 1) / 1]

+-----+-------+
|label|  count|
+-----+-------+
|    1| 480422|
|    0|1990293|
+-----+-------+



                                                                                

Podemos observar que existen muchos mas comentarios clasificados como normales que deprimidos 

## Pre-procesamiento y Limpieza con Spark

In [6]:
from pyspark.sql.functions import col, concat_ws, trim, lower
from pyspark.sql.types import IntegerType

Iniciamos la limpieza distribuida del dataset de Reddit

In [7]:
df_cleaned = df_spark.dropna(subset=["Title", "Body", "Label"])

In [8]:
df_cleaned = df_cleaned.withColumn("clean_text", concat_ws(" ", col("Title"), col("Body")))

In [9]:
# trim() quita espacios al inicio/final, lower() convierte a minúsculas.
df_cleaned = df_cleaned.withColumn("clean_text_cleaned", lower(trim(col("clean_text"))))


Filtra entradas no validas

In [10]:
df_filtered = df_cleaned.filter(
    (col("clean_text_cleaned") != "") & 
    (col("clean_text_cleaned") != "no content") &
    (col("clean_text_cleaned").isNotNull())
)

Elimina Duplicados y selecciona las columnas finales

In [11]:
# Nos quedamos solo con el texto y la etiqueta, y eliminamos filas idénticas.
df_final = df_filtered.select(
    col("clean_text"), 
    col("Label").alias("is_depression") # Renombramos Label para claridad
).dropDuplicates(["clean_text", "is_depression"])


In [12]:
final_count = df_final.count()

                                                                                

In [13]:
print(f"✅ Limpieza completada. Número de filas final: {final_count}")

✅ Limpieza completada. Número de filas final: 2009690


In [14]:
inicial_count = df_spark.count()

                                                                                

In [15]:
print(f"✅ Limpieza completada. Número de filas inical: {inicial_count}")

✅ Limpieza completada. Número de filas inical: 2470715


In [16]:
df_final.show(20, truncate=50)

[Stage 15:>                                                         (0 + 1) / 1]

+--------------------------------------------------+-------------+
|                                        clean_text|is_depression|
+--------------------------------------------------+-------------+
|The word 'may' means with equal significance, '...|            0|
|what is love everyone has a different perspecti...|            0|
|Is it more beneficial to be above average at ev...|            0|
|Deep thoughts on Covid-19 \nI’m not much of a w...|            0|
|"We delight in the beauty of the butterfly but ...|            0|
|Something came to my thoughts So lately I’ve be...|            0|
|I’ve deleted all my social media and it’s time ...|            0|
|Monopoly is the most realistic game Because you...|            0|
|People are actually smarter or capable of being...|            0|
|You Exist Right Now And That's Bizarre https://...|            0|
|I wanna leave this earth having at least impact...|            0|
|The smarter we are getting, the more we are dis...|          

                                                                                

In [17]:
# --- Celda de Guardado Intermedio (Reemplaza la de .toPandas()) ---

print("--- Guardando el DataFrame limpio en formato Parquet en HDFS ---")
print("Esto evita la operación .toPandas() que puede fallar con datos grandes.")

# Definimos una ruta en HDFS para guardar el resultado intermedio
output_parquet_path = "hdfs://namenode:8020/data/reddit_cleaned.parquet"

# .write.mode('overwrite').parquet() es la forma eficiente de guardar.
# Guardará el resultado como una carpeta con múltiples archivos Parquet.
df_final.write.mode('overwrite').parquet(output_parquet_path)

print(f"✅ DataFrame limpio guardado en HDFS en: {output_parquet_path}")

# Detener la sesión de Spark, ya no la necesitamos para los siguientes pasos
spark.stop()
print("✅ Sesión de Spark detenida.")

--- Guardando el DataFrame limpio en formato Parquet en HDFS ---
Esto evita la operación .toPandas() que puede fallar con datos grandes.


                                                                                

✅ DataFrame limpio guardado en HDFS en: hdfs://namenode:8020/data/reddit_cleaned.parquet
✅ Sesión de Spark detenida.
