In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, date_format, rank, when, desc, count, mean, row_number, regexp_extract, explode, split, regexp_replace
from pyspark.sql.window import Window
from datetime import datetime
from typing import List, Tuple
from pyspark.sql.types import StringType, IntegerType, ArrayType
import emoji
import re
import psutil

In [28]:
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:

    # Creamos la sesión de Spark y cargamos el archivo
    spark = SparkSession.builder.appName("TopDateUsers").getOrCreate()
    df = spark.read.json(file_path)

    # Convertimos la columna date (para que tome solo lo necesario)
    df = df.withColumn("date", date_format(col("date"), "yyyy-MM-dd").cast("date"))

    # Unimos los tweets originales y los tweets en quotedTweet
    df_combined = df.select("date", "user.username").union(df.select("quotedTweet.date", "quotedTweet.user.username").filter(col("quotedTweet.date").isNotNull() & col("quotedTweet.user.username").isNotNull()))

    #df con las 10 fechas con más tweets
    #esta parte se podría parametrizar, para este caso es valido ya que sabemos a priori el tamaño a mostrar en la lista,
    #pero de ser un tamaño n no sería conveniente.
    top_dates = df_combined.groupBy("date").count().orderBy(desc("count")).limit(10)

    
    #Obtenemos las 10 fechas con más tweets directamente como DataFrame
    top_dates_df = top_dates.select("date")

    #Convertimos las fechas en una lista de objetos datetime.date
    top_dates_list = [row.date for row in top_dates_df.collect()]

    # Limpiams el DataFrame intermedio
    top_dates_df.unpersist()
    
    # almacenamos en una lista las 10 fechas que más se repiten del df anterior
    #top_dates_list = top_dates.select("date").rdd.flatMap(lambda x: x).collect()


    #creamos un df que solo contiene las fechas que están en la lista que generamos
    filtered_df = df_combined.filter(col("date").isin(top_dates_list))

    #Limpiamos el DataFrame que agrupa los tweets (originales - retweets)
    df_combined.unpersist()
    
    # Agrupamos el DataFrame por fecha y usuario y calcula el recuento de cada grupo
    grouped_df = filtered_df.groupBy("date", "username").agg(count("*").alias("count"))

    # Usamos una ventana para obtener el usuario que más se repite por fecha mediante rank
    window_spec = Window.partitionBy("date").orderBy(desc("count"))
    top_users_df = grouped_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1)

    #Seleccionar y ordenamos las columnas necesarias
    result_df = top_users_df.select("date", "username", "count").orderBy(desc("count"))

    # Recopilamos los resultados en una lista de tuplas
    result = result_df.collect()

    # Conviertimos el resultado en una lista de tuplas
    result_list = [(row.date, row.username) for row in result]
    

    return result_list

In [29]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    # Creamos la sesión de Spark y cargamos el archivo
    spark = SparkSession.builder.appName("TopEmojis").getOrCreate()
    df = spark.read.json(file_path)

    # Función para dividir los emojis en caracteres individuales
    def extract_individual_emojis(text):
        return [c for c in text if c in emoji.EMOJI_DATA]

    # UDF para dividir lo emojis y entregar un arreglo
    extract_individual_emojis_udf = udf(extract_individual_emojis, ArrayType(StringType()))

    # Agregamos una columna "emojis" al DataFrame con emojis individuales (en un arreglo)
    df = df.withColumn("emojis", extract_individual_emojis_udf(df["content"]))
    
    # Desagrupamos los arreglos del campo emojis en filas separadas (emoji)
    df = df.select("emojis").withColumn("emoji", explode("emojis")).filter(col("emoji") != "")

    # Contamos la frecuencia de cada emoji en todo el DataFrame
    emoji_counts = df.groupBy("emoji").count()

    # Ordenamos los resultados por conteo en orden descendente
    emoji_counts = emoji_counts.orderBy(desc("count"))

    # Obtenemos los 10 emojis con mayor conteo
    top_10_emojis = emoji_counts.limit(10).collect()

    return top_10_emojis

In [30]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    # Crear una sesión de Spark
    spark = SparkSession.builder.appName("TopMentionedUsers").getOrCreate()
    # Cargar el archivo JSON en un DataFrame
    df_mention = spark.read.json(file_path)

    # Convierto la columna date en
    df_mention = df_mention.withColumn("date", date_format(col("date"), "yyyy-MM-dd").cast("date"))

    # Unir los tweets originales y los tweets en quotedTweet
    df_combined_mention = df_mention.select("mentionedusers.username").filter(col("mentionedUsers.username").isNotNull()).union(df_mention.select("quotedTweet.mentionedUsers.username").filter(col("quotedTweet.mentionedUsers.username").isNotNull()))

    # Descomponer la columna "usernames" en filas individuales
    df = df_combined_mention.selectExpr("explode(username) as usernames")

    # Contar el número de veces que aparece cada usuario
    user_counts = df.groupBy("usernames").count()

    # Ordenar en orden descendente por conteo y seleccionar los 10 primeros usuarios
    top_10_users = user_counts.orderBy(col("count").desc()).limit(10)

    # Recopilar los resultados en una lista de tuplas (usuario, conteo)
    result_list = [(row.usernames, row["count"]) for row in top_10_users.collect()]

    return result_list

In [31]:
# Modificar file_path:
file_path = "/home/david/Escritorio/farmers-protest-tweets-2021-2-4.json"
%time top_tweets_time = q1_time(file_path)
%time top_emojis_time = q2_time(file_path)
%time top_mentions_time = q3_time(file_path)

print('\n')
print(top_tweets_time)
print('\n')
print(top_mentions_time)
print('\n')
print(top_emojis_time)

23/09/26 14:58:29 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


CPU times: user 14.2 ms, sys: 7.76 ms, total: 22 ms
Wall time: 3.26 s


23/09/26 14:58:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


CPU times: user 2.79 ms, sys: 11.1 ms, total: 13.9 ms
Wall time: 1.92 s




CPU times: user 11.9 ms, sys: 483 µs, total: 12.4 ms
Wall time: 2.01 s


[('2021-02-19', 'Preetm91'), ('2021-02-18', 'neetuanjle_nitu'), ('2021-02-13', 'MaanDee08215437'), ('2021-02-17', 'RaaJVinderkaur'), ('2021-02-16', 'jot__b'), ('2021-02-23', 'preetysaini321'), ('2021-02-15', 'jot__b'), ('2021-02-14', 'Gurpreetd86'), ('2021-02-20', 'MangalJ23056160'), ('2021-02-12', 'rebelpacifist')]


[('narendramodi', 2623), ('Kisanektamorcha', 2045), ('RakeshTikaitBKU', 1848), ('PMOIndia', 1560), ('GretaThunberg', 1274), ('RahulGandhi', 1252), ('rihanna', 1142), ('DelhiPolice', 1134), ('RaviSinghKA', 1127), ('UNHumanRights', 1057)]


[Row(emoji='🙏', count=7286), Row(emoji='😂', count=3072), Row(emoji='🚜', count=2972), Row(emoji='✊', count=2411), Row(emoji='🌾', count=2363), Row(emoji='🏻', count=2080), Row(emoji='❤', count=1779), Row(emoji='🤣', count=1668), Row(emoji='🏽', count=1218), Row(emoji='👇', count=1108)]


                                                                                

### Memoria

In [36]:
def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    #Creamos la sesión de Spark y cargamos el archivo
    spark = SparkSession.builder.appName("TopDateUsers").getOrCreate()
    df = spark.read.json(file_path)

    #Convertimos la columna de fecha (para que tome solo lo necesario)
    df = df.withColumn("date", date_format(col("date"), "yyyy-MM-dd").cast("date"))

    # Unimos los tweets originales y los tweets en quotedTweet
    df_combined = df.select("date", "user.username").union(df.select("quotedTweet.date", "quotedTweet.user.username").filter(col("quotedTweet.date").isNotNull() & col("quotedTweet.user.username").isNotNull()))

    #df con las 10 fechas con más tweets
    #esta parte se podría parametrizar, para este caso es válido ya que sabemos a priori el tamaño a mostrar en la lista,
    #pero de ser un tamaño n no sería conveniente.
    
    #Para liberar memoria realizo primero la agregación (para contar la cantidad por fecha) y luego obtengo el top ordenandolos
    #En dos pasos separados
    agg_df = df_combined.groupBy("date").agg(count("*").alias("count"))
    
    # Ordenar los resultados
    top_dates = agg_df.orderBy(desc("count")).limit(10)
        
    #Obtenemos las 10 fechas con más tweets directamente como DataFrame
    top_dates_df = top_dates.select("date")

    # Obtenemos las 10 fechas como lista
    top_dates_list = [row.date for row in top_dates.collect()]

    # Filtrar el DataFrame original
    filtered_df = df_combined.filter(col("date").isin(top_dates_list))

    #Agrupamos el DataFrame por fecha y usuario y calcula el recuento de cada grupo
    grouped_df = filtered_df.groupBy("date", "username").agg(count("*").alias("count"))

    # Usamos una ventana para obtener el usuario que más se repite por fecha mediante rank
    window_spec = Window.partitionBy("date").orderBy(desc("count"))
    top_users_df = grouped_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1)

    # Seleccionamos las columnas necesarias
    result_df = top_users_df.select("date", "username", "count")
    
    #Ordenamos el DataFrame por count en orden descendente
    result_df = result_df.orderBy(desc("count"))

    #Recopilamos los resultados en una lista de tuplas
    result = result_df.collect()

    #Conviertimos el resultado en una lista de tuplas
    result_list = [(row.date, row.username) for row in result]

    return result_list


In [39]:
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    # Creamos la sesión de Spark y cargar el archivo
    spark = SparkSession.builder.appName("TopEmojis").getOrCreate()
    df = spark.read.json(file_path)

    # Función para dividir los emojis en caracteres individuales
    def extract_individual_emojis(text):
        return [c for c in text if c in emoji.EMOJI_DATA]

    # UDF para dividir los emojis y entregar un arreglo
    extract_individual_emojis_udf = udf(extract_individual_emojis, ArrayType(StringType()))

    # Agregamos una columna "emojis" al DataFrame con emojis individuales (en un arreglo)
    df = df.withColumn("emojis", extract_individual_emojis_udf(df["content"]))
    
    # Desagrupamos los arreglos del campo emojis en filas separadas (emoji)
    df = df.select("emojis").withColumn("emoji", explode("emojis")).filter(col("emoji") != "")

    # Contamos la frecuencia de cada emoji en todo el DataFrame
    emoji_counts = df.groupBy("emoji").count()

    #Obtenemos los 10 emojis con mayor conteo (usando limit antes de ordenar)
    top_10_emojis = emoji_counts.orderBy(desc("count")).limit(10).collect()

    # Liberamos la memoria eliminando el DataFrame df
    df.unpersist()

    return top_10_emojis

In [40]:
# Ejemplo de uso:
file_path = "/home/david/Escritorio/farmers-protest-tweets-2021-2-4.json"
%time top_tweets_memory = q1_memory(file_path)
%time top_mentions_memory = q2_memory(file_path)

print('\n')
print(top_tweets_memory)
print('\n')
print(top_mentions_memory)
print('\n')
#print(top_emojis_memory)

23/09/26 15:12:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

CPU times: user 27.5 ms, sys: 1.61 ms, total: 29.1 ms
Wall time: 3.27 s


23/09/26 15:12:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

CPU times: user 10 ms, sys: 2.01 ms, total: 12 ms
Wall time: 2.02 s


[('2021-02-19', 'Preetm91'), ('2021-02-18', 'neetuanjle_nitu'), ('2021-02-13', 'MaanDee08215437'), ('2021-02-17', 'RaaJVinderkaur'), ('2021-02-16', 'jot__b'), ('2021-02-23', 'preetysaini321'), ('2021-02-15', 'jot__b'), ('2021-02-14', 'Gurpreetd86'), ('2021-02-20', 'MangalJ23056160'), ('2021-02-12', 'rebelpacifist')]


[Row(emoji='🙏', count=7286), Row(emoji='😂', count=3072), Row(emoji='🚜', count=2972), Row(emoji='✊', count=2411), Row(emoji='🌾', count=2363), Row(emoji='🏻', count=2080), Row(emoji='❤', count=1779), Row(emoji='🤣', count=1668), Row(emoji='🏽', count=1218), Row(emoji='👇', count=1108)]




