
# Spark — Tendencias desde CSV grande → salida CSV

**Objetivo:** Leer un **CSV con miles de filas** (tweets/posts), detectar **hashtags en tendencia por ventana de 1 hora** y **exportar resultados a CSV**.



## 1) Requisitos y estructura esperada del CSV

- Instala PySpark si hace falta: `!pip install pyspark`  
- Estructura esperada (mínima):  
  - `post_id` *(string)*  
  - `user` *(string)*  
  - `ts` *(timestamp en formato ISO o `YYYY-MM-DD HH:MM:SS`)*  
  - `text` *(string con texto del post — se extraerán hashtags desde aquí si no existe la columna `hashtags`)*  
  - **Opcional**: `hashtags` *(string con formato `"#tag1 #tag2 #tag3"`)*  

> Si tu CSV **no** trae la columna `hashtags`, el notebook extraerá las etiquetas a partir de `text`.


In [1]:

# (Opcional) Instalar PySpark si no lo tienes
# !pip install pyspark


## 2) SparkSession

In [1]:

from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("Tweets_Tendencias_FromCSV_ToCSV")
         .getOrCreate())

spark



## 3) Parámetros de entrada/salida

- `input_path`: ruta al CSV grande de entrada (con cabecera).  
- `output_dir`: carpeta de salida; se generará un archivo CSV con los **top 3 hashtags por hora**.


In [2]:

from pathlib import Path

# Ajusta estas rutas a tus archivos locales
input_path = "tweets.csv"
output_dir = Path("./out/top_hashtags_por_hora_csv")
output_dir.mkdir(parents=True, exist_ok=True)

print("Entrada:", input_path)
print("Salida (directorio):", output_dir.as_posix())


Entrada: tweets.csv
Salida (directorio): out/top_hashtags_por_hora_csv


## 4) Cargar CSV grande

In [3]:

from pyspark.sql.functions import col

df_raw = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .option("multiLine", True)   # por si 'text' tiene saltos de línea
          .option("escape", '"')
          .csv(input_path))

df_raw.printSchema()
df_raw.show(5, truncate=False)
print("Filas totales:", df_raw.count())


root
 |-- post_id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)

+--------+---------+-------------------+--------------------------------------------------+--------------------------+
|post_id |user     |ts                 |text                                              |hashtags                  |
+--------+---------+-------------------+--------------------------------------------------+--------------------------+
|p0_00000|user_0138|2025-08-25 08:57:11|Tips de optimización en PySpark y particionado    |#kafka #spark             |
|p0_00001|user_0195|2025-08-25 08:41:40|Delta y formatos columnares mejoran el rendimiento|#nlp                      |
|p0_00002|user_0070|2025-08-25 10:54:23|Arquitecturas de datos modernas en la nube        |#cloud #bigdata           |
|p0_00003|user_0194|2025-08-25 09:35:41|Tips de optimización en PySpark y particionado    |#


## 5) Normalización de timestamps y extracción de hashtags

- Convierte `ts` a `timestamp`.  
- Si existe `hashtags`, úsala; si no, **extrae** etiquetas desde `text`.  
- Explota a **una fila por hashtag**.


In [4]:

from pyspark.sql.functions import to_timestamp, regexp_replace, trim, lower, split, explode, when, size

# Asegurar timestamp (ajusta el formato si tu CSV lo usa distinto)
df = df_raw.withColumn("ts", to_timestamp(col("ts")))

# Normaliza una columna 'hashtags_base' (si no existe, se extrae de 'text')
if "hashtags" in df.columns:
    df = df.withColumn("hashtags_base", trim(lower(regexp_replace(col("hashtags"), "\s+", " "))))
else:
    # Extraer hashtags desde el texto: separa en tokens y filtra los que empiezan con '#'
    df = df.withColumn("text_clean", trim(lower(regexp_replace(col("text"), "\s+", " "))))
    df = df.withColumn("tokens", split(col("text_clean"), " "))
    # Filtra tokens con '#'; vuelve a unirlos con espacios para reutilizar la misma lógica posterior
    from pyspark.sql.functions import expr
    df = df.withColumn("hashtags_base", expr("array_join(filter(tokens, x -> x like '#%'), ' ')"))

# Explota hashtags a filas
from pyspark.sql.functions import regexp_replace as rxrep

df_tags = (df
           .withColumn("tag", explode(split(col("hashtags_base"), " ")))
           .withColumn("tag", trim(rxrep(col("tag"), "[^#a-z0-9_]", "")))
           .filter((col("tag").isNotNull()) & (col("tag") != ""))
          )

# Mantén solo columnas relevantes
df_tags = df_tags.select("post_id","user","ts","tag")

df_tags.show(10, truncate=False)
print("Filas con tags:", df_tags.count())


+--------+---------+-------------------+-------------+
|post_id |user     |ts                 |tag          |
+--------+---------+-------------------+-------------+
|p0_00000|user_0138|2025-08-25 08:57:11|#kafka       |
|p0_00000|user_0138|2025-08-25 08:57:11|#spark       |
|p0_00001|user_0195|2025-08-25 08:41:40|#nlp         |
|p0_00002|user_0070|2025-08-25 10:54:23|#cloud       |
|p0_00002|user_0070|2025-08-25 10:54:23|#bigdata     |
|p0_00003|user_0194|2025-08-25 09:35:41|#opensource  |
|p0_00003|user_0194|2025-08-25 09:35:41|#spark       |
|p0_00004|user_0073|2025-08-25 09:41:19|#datascience |
|p0_00004|user_0073|2025-08-25 09:41:19|#deeplearning|
|p0_00005|user_0245|2025-08-25 18:26:18|#ai          |
+--------+---------+-------------------+-------------+
only showing top 10 rows

Filas con tags: 30690


## 6) Tendencias por ventana de 1 hora (top 3)

In [5]:

from pyspark.sql.functions import window, count, desc, col as c
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

agg_hour = (df_tags
            .groupBy(window(c("ts"), "1 hour").alias("win"), c("tag"))
            .agg(count("*").alias("freq")))

w = Window.partitionBy("win").orderBy(desc("freq"), c("tag"))
ranked = agg_hour.withColumn("rank", row_number().over(w))
top3_por_hora = ranked.filter(c("rank") <= 3)                       .select(
                          c("win").getField("start").alias("window_start"),
                          c("win").getField("end").alias("window_end"),
                          c("tag"), c("freq"), c("rank")
                      )                       .orderBy("window_start","rank")

top3_por_hora.show(50, truncate=False)


+-------------------+-------------------+----------+----+----+
|window_start       |window_end         |tag       |freq|rank|
+-------------------+-------------------+----------+----+----+
|2025-08-25 08:00:00|2025-08-25 09:00:00|#spark    |57  |1   |
|2025-08-25 08:00:00|2025-08-25 09:00:00|#ai       |56  |2   |
|2025-08-25 08:00:00|2025-08-25 09:00:00|#streaming|47  |3   |
|2025-08-25 09:00:00|2025-08-25 10:00:00|#etl      |44  |1   |
|2025-08-25 09:00:00|2025-08-25 10:00:00|#streaming|40  |2   |
|2025-08-25 09:00:00|2025-08-25 10:00:00|#ai       |39  |3   |
|2025-08-25 10:00:00|2025-08-25 11:00:00|#bigdata  |48  |1   |
|2025-08-25 10:00:00|2025-08-25 11:00:00|#python   |46  |2   |
|2025-08-25 10:00:00|2025-08-25 11:00:00|#streaming|45  |3   |
|2025-08-25 11:00:00|2025-08-25 12:00:00|#ai       |47  |1   |
|2025-08-25 11:00:00|2025-08-25 12:00:00|#etl      |45  |2   |
|2025-08-25 11:00:00|2025-08-25 12:00:00|#spark    |44  |3   |
|2025-08-25 12:00:00|2025-08-25 13:00:00|#ai       |49 

## 7) Guardar **CSV** de salida (coalesce a 1 archivo)

In [6]:

# Escribimos a CSV (con cabecera). coalesce(1) para dejar un solo archivo de salida.
# Nota: coalesce(1) mueve todo a una sola partición para la escritura; es cómodo para demos y datasets modestos.
(
    top3_por_hora
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", True)
    .csv(output_dir.as_posix())
)

print("CSV(s) escritos en:", output_dir.as_posix())

# (Opcional) Renombrar part-*.csv a un nombre fijo
import shutil, os, glob
parts = glob.glob(str(output_dir / "part-*.csv"))
if parts:
    final_csv = output_dir / "top_hashtags_por_hora.csv"
    shutil.move(parts[0], final_csv)
    # limpiar archivos sobrantes (como .crc) si existen
    for extra in glob.glob(str(output_dir / "*")):
        if extra.endswith(".crc") or extra.endswith(".csv") and Path(extra).name.startswith("part-"):
            try:
                os.remove(extra)
            except:
                pass
    print("Archivo final:", final_csv.as_posix())
else:
    print("Nota: no se encontró part-*.csv; revisa permisos o particiones.")


CSV(s) escritos en: out/top_hashtags_por_hora_csv
Archivo final: out/top_hashtags_por_hora_csv/top_hashtags_por_hora.csv
