En este archivo puedes escribir lo que estimes conveniente. Te recomendamos detallar tu solución y todas las suposiciones que estás considerando. Aquí puedes ejecutar las funciones que definiste en los otros archivos de la carpeta src, medir el tiempo, memoria, etc.

In [None]:
file_path = "farmers-protest-tweets-2021-2-4.json"

Para resolver los 3 problemas se tomó la opción de abordar con Bigquery los casos en donde se busca reducir los tiempos de ejecucion, esto es, para q1_time, q2_time y q3_time. Si bien para el caso de pruebas que contempla un archivo con 117408 registros no justifica su uso, pensando en un volumen mayor de registro (millones) si lo justificaria por sus caracteristicas propias, paralelización de consultas en los nodos de procesamiento y optimización de ejecución.
Por su parte, para los casos q1_memory, q2_memory y q3_memory donde se busca optimizar el uso de memoria, se ha optado por el uso de pyspak, por su enfoque de procesamiento basado en particiones, que puede dividir los datos en bloques y procesarlos en paralelo.

## PROBLEMA 1: Las top 10 fechas donde hay más tweets

### q1_time


Lo primero que se hace es cargar la data en una tabla en bigquery.

Se indica un dataset y un nombre de tabla donde será cargada la data. Las variables se encuentran seteadas dentro de la función (primeras lineas).

Una vez cargada la data, se ejecuta la query contenida en el archivo q1_time.sql en el folder "queries".

La logica de esta query es primero obtener las 10 fechas con mas tweets

    SELECT DATE(date)
    FROM {dataset_id}.{table_id}
    GROUP BY DATE(date)
    ORDER BY COUNT(1) DESC
    LIMIT 10

Luego, obtiene el numero de tweets por cada usuario pero solo sobre las top 10 fechas ya obtenidas.

    SELECT
    DATE(date) AS tweet_date,
    user.username AS username,
    COUNT(1) AS tweet_count
    FROM {dataset_id}.{table_id}
    WHERE DATE(date) IN (
        -- Subconsulta de las 10 fechas principales
    )
    GROUP BY tweet_date, username


Con la funcion rank() se asigna un rango a cada usuario basado en la cantidad de tweets, dentro de cada fecha.

    SELECT
    tweet_date,
    username,
    RANK() OVER (PARTITION BY tweet_date ORDER BY tweet_count DESC) AS rank
    FROM (
        -- Resultados anteriores
    ) AS aggregated_data


Finalmente se filtra por rank=1, para obtener el usuario con mas tweets para cada dia.

    SELECT
    tweet_date,
    username
    FROM (
        -- Resultados anteriores con ranking
    )
    WHERE rank = 1
    ORDER BY tweet_date ASC


In [1]:
from q1_memory import q1_memory
from q1_time import q1_time
file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"

q1_time(file_path)

Datos cargados en Bigquery
Filename: /Users/acarcamo/Documents/Personal/challenge_DE/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
    11     93.0 MiB     93.0 MiB           1   @profile
    12                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    13     93.0 MiB      0.0 MiB           1       dataset_id = "sandbox_agcarcamo"
    14     93.0 MiB      0.0 MiB           1       table_id = "de_test5"
    15                                         
    16     93.0 MiB      0.0 MiB           1       result = []
    17                                         
    18     93.0 MiB      0.0 MiB           1       try:
    19                                                 # Valida si el archivo existe
    20     93.0 MiB      0.0 MiB           1           if not os.path.exists(file_path):
    21                                                     raise FileNotFoundError(f"El archivo {file_path} no se encuentra.

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria')]

### q1_memory
Para este caso, tambien se realiza una prueba local. Queda pendiente realizar la ejecución en un cluster dataproc.

Pasos de ejecución:

Se carga la data en un dataframe

    df = spark.read.option("encoding", "UTF-8").json(file_path)


Se seleccionan solo las columnas date y user.username necesarias para este caso y se filtran los nulos si es que hubiese.

    df = df.select(col("date"), col("user.username")).filter(
    col("date").isNotNull() & col("username").isNotNull()
    )


Convierte el campo date a fecha, formato yyyy-MM-dd

    df = df.withColumn("tweet_date", col("date").cast("date"))


Obtiene count de tweets por dia

    date_counts = df.groupBy("tweet_date").agg(count("*").alias("tweet_count"))


Obtiene top 10 fechas con mas tweets

    top_dates = date_counts.orderBy(col("tweet_count").desc()).limit(10)


Obtiene count de tweets de usuario por dia

    user_tweet_counts = df.groupBy("tweet_date", "username").agg(count("*").alias("tweet_count_user"))


Join entre top 10 dias y tweets por usuario en cada dia

    top_users = user_tweet_counts.join(broadcast(top_dates), on="tweet_date")


Realiza rank de usuarios por tweets

    window_spec = Window.partitionBy("tweet_date").orderBy(col("tweet_count_user").desc())
    ranked_users = top_users.withColumn("rank", rank().over(window_spec))


Filtra solo el usuario con mas tweets por dia

    result = ranked_users.filter(col("rank") == 1).select("tweet_date", "username")


Formatea la salida

    result_list = result.collect()
    formatted_result = [(row["tweet_date"], row["username"]) for row in result_list]


In [1]:
from q1_memory import q1_memory

file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"

q1_memory(file_path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/23 03:49:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 2:>                (0 + 10) / 10][Stage 3:>                 (0 + 0) / 10]

CodeCache: size=131072Kb used=25225Kb max_used=25225Kb free=105846Kb
 bounds [0x0000000105e18000, 0x00000001076e8000, 0x000000010de18000]
 total_blobs=9821 nmethods=8902 adapters=830
 compilation: disabled (not enough contiguous free space left)


                                                                                

Filename: /Users/acarcamo/Documents/Personal/challenge_DE/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9     71.6 MiB     71.6 MiB           1   @profile
    10                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    11     71.6 MiB      0.0 MiB           1       spark = None
    12     71.6 MiB      0.0 MiB           1       try:
    13                                                 # Crea sesión de Spark
    14     73.6 MiB      2.0 MiB           1           spark = SparkSession.builder.appName("Top10Tweets").getOrCreate()
    15                                         
    16                                                 # Carga los datos en dataframe
    17     73.6 MiB      0.0 MiB           1           try:
    18     73.8 MiB      0.2 MiB           1               df = spark.read.option("encoding", "UTF-8").json(file_path)
    19                                                 except

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria')]

## PROBLEMA 2: Los top 10 emojis más usados con su respectivo conteo


### q2_time


Tal como el problema q1_time, primero la data se carga en una tabla bigquery y luego se realiza una query para obtener los emojis mas usados.

Tras revisar la data previamente, la forma de abordar este problema fue identificar el rango Unicode donde se definen los emojis,

'[\x{{1F600}}-\x{{1F64F}}]|[\x{{1F300}}-\x{{1F5FF}}]|[\x{{1F680}}-\x{{1F6FF}}]|[\x{{2600}}-\x{{26FF}}]|[\x{{2700}}-\x{{27BF}}]|[\x{{1F700}}-\x{{1F77F}}]|[\x{{1F780}}-\x{{1F7FF}}]|[\x{{1F800}}-\x{{1F8FF}}]|[\x{{1F900}}-\x{{1F9FF}}]|[\x{{1FA00}}-\x{{1FA6F}}]|[\x{{1FA70}}-\x{{1FAFF}}]'


pero dentro de esta revisión tambien se identificó el rango Unicode
\x{1F3FB} a \x{1F3FF}
que se utiliza como modificador de tono de piel en los emojis, entonces por ejemplo, para no considerar esto

    💪: tono de piel amarillo.
    💪🏻: Tono de piel claro.
    💪🏼: Tono de piel claro-medio.
    💪🏽: Tono de piel medio.
    💪🏾: Tono de piel oscuro-medio.

como 5 emojis distintos, si no mas bien como el mismo emoji que se repite 5 veces, decidi omitir dicho rango Unicode del contenido.

Entonces la query funciona asi,

Se elimina de la columna content cualquier modificador de tono de piel que está representado por los rangos Unicode \x{1F3FB} a \x{1F3FF} y se renombra la columna procesada como cleaned_content.

Se utiliza REGEXP_EXTRACT_ALL con una expresión regular para diferentes rangos de Unicode donde se definen los emojis (como caras, símbolos, objetos, etc.). Asi se obtiene una lista de todos los emojis encontrados en cleaned_content.

Se usa UNNEST para convertir la lista resultante de REGEXP_EXTRACT_ALL en filas individuales, donde cada una contiene un emoji.

Se agrupa por cada emoji y cuenta cuántas veces aparece cada uno (COUNT(*)).

Finalmente, ordena los emojis por su frecuencia en orden descendente (ORDER BY count DESC) y retorna los 10 emojis más usados (LIMIT 10).


In [1]:
from q2_time import q2_time

file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"
q2_time(file_path)

Datos cargados en Bigquery


[('🙏', 7286),
 ('😂', 3072),
 ('🚜', 2972),
 ('✊', 2411),
 ('🌾', 2363),
 ('❤', 1779),
 ('🤣', 1668),
 ('👇', 1108),
 ('💚', 1040),
 ('💪', 947)]

### q2_memory

Se realiza una prueba local, quedando pendiente realizar la prueba en un cluster dataproc.

Los pasos de ejecución son:

Se definen dos patrones de regex:

    - emoji_pattern: Rango Unicode que incluye varias categorías de emojis.
    - skin_tone_modifiers: Rango que captura modificadores de tono de piel.

Se define la funcion extract_emojis:

    - Elimina los modificadores de tono de piel para que no afecten los conteos.
    - Extrae los emojis usando el patrón emoji_pattern.

Se crea un rdd

    rdd = df.select("content").rdd.flatMap(lambda row: extract_emojis(row["content"]))

Se realiza conteo de emojis

    emoji_counts = (
        rdd.filter(lambda emoji: emoji)           # Filtra emojis válidos
        .map(lambda emoji: (emoji, 1))           # Mapea cada emoji a (emoji, 1)
        .reduceByKey(lambda a, b: a + b)         # Suma las ocurrencias de cada emoji
    )

    filter: Asegura que solo se procesen valores no vacíos.
    map: Convierte cada emoji en una tupla (emoji, 1) para contar las ocurrencias.
    reduceByKey: Agrupa los emojis y suma sus ocurrencias para calcular el conteo total.

Se obtienen los 10 emojis más usados

    top_10_emojis = emoji_counts.takeOrdered(10, key=lambda x: -x[1])



In [None]:
from q2_memory import q2_memory

file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"
q2_memory(file_path)

## PROBLEMA 3: El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos


### q3_time

Revisando la data se verifica que el campo mentionedUsers.username contiene los usuarios mencionados en cada tweet. Para esto la query consiste de 2 pasos:

   Desanidar la lista de usuarios mencionados en una estructura tabular

        WITH mentioned_users AS (
          SELECT
            A.id,
            mentionedUsers.username AS mention
          FROM {dataset_id}.{table_id} A,
          UNNEST(mentionedUsers) AS mentionedUsers
        )
   Contar cuántas veces fue mencionado cada usuario y obtener los 10 más mencionados.

        SELECT
          mention AS username,
          COUNT(*) AS mention_count
        FROM mentioned_users
        GROUP BY mention
        ORDER BY mention_count DESC
        LIMIT 10;




In [1]:
from q3_time import q3_time

file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"
q3_time(file_path)

Datos cargados en Bigquery
Filename: /Users/acarcamo/Documents/Personal/challenge_DE/src/q3_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9     86.0 MiB     86.0 MiB           1   @profile
    10                                         def q3_time(file_path: str) -> List[Tuple[str, int]]:
    11     86.0 MiB      0.0 MiB           1       dataset_id = "sandbox_agcarcamo"
    12     86.0 MiB      0.0 MiB           1       table_id = "de_test3"
    13                                         
    14     86.0 MiB      0.0 MiB           1       result = []
    15                                         
    16     86.0 MiB      0.0 MiB           1       try:
    17                                                 # Valida si el archivo existe
    18     86.0 MiB      0.0 MiB           1           if not os.path.exists(file_path):
    19                                                     raise FileNotFoundError(f"El archivo {file_path} no se encuentra.")
    20 

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

### q3_memory

Se realiza una prueba local, quedando pendiente realizar la prueba en un cluster dataproc.

Los pasos de ejecución son:

Se carga la data a un dataframe:

    df = spark.read.option("encoding", "UTF-8").json(file_path)


Se realiza un explode de las menciones. Por cada usuario dentro de mentionedUsers.username se crea un registro en la nueva columna mention.

    flattened_df = df.withColumn("mention", explode(col("mentionedUsers.username")))


Se cuenta la cantidad de menciones por usuario

    mention_count_df = flattened_df.groupBy("mention").agg(F.count("*").alias("mention_count"))

Se obtienen los 10 usuarios mas influyentes

    top_users = mention_count_df.orderBy("mention_count", ascending=False).limit(10)



In [2]:
from q3_memory import q3_memory

file_path = "/Users/acarcamo/Downloads/farmers-protest-tweets-2021-2-4.json"
q3_memory(file_path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/23 04:11:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


CodeCache: size=131072Kb used=23520Kb max_used=23520Kb free=107551Kb
 bounds [0x00000001064d8000, 0x0000000107bf8000, 0x000000010e4d8000]
 total_blobs=9103 nmethods=8197 adapters=819
 compilation: disabled (not enough contiguous free space left)


                                                                                

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]