In [1]:
file_path = "../data/farmers-protest-tweets-2021-2-4.json"

# LATAM Challenge

### Proyecto de Ingeniería de Datos

Este proyecto aborda un desafío de ingeniería de datos que consiste en procesar un conjunto de datos de tweets relacionados con protestas agrícolas. La idea es implementar unas funciones que sean óptimas en tiempo y consumo de memoria. Además, se implementaron pruebas unitarias para garantizar la precisión de las funciones desarrolladas.

## Enfoques

### Enfoque Local
Utiliza librerías básicas para procesar la información localmente.

### Enfoque Distribuido
Usa PySpark para demostrar cómo se podrían implementar soluciones similares pero para el procesamiento de grandes volúmenes de datos en entornos distribuidos, aunque los datos en este caso no son tan grandes.

## Problema

Se proporciona un archivo JSON de 398 MB con tweets. Se deben resolver los siguientes problemas:

1. **Top 10 Fechas con Más Tweets**: Identificar las 10 fechas con más tweets y el usuario con más publicaciones en cada una.
2. **Top 10 Emojis Más Usados**: Identificar los 10 emojis más usados y su conteo.
3. **Top 10 Usuarios Más Mencionados**: Identificar los 10 usuarios más influyentes según el conteo de menciones (@).

Cada problema se resolverá optimizando el tiempo de ejecución y el uso de memoria.

## Pruebas Unitarias

Se implementaron pruebas unitarias para cada una de las funciones desarrolladas, garantizando así la exactitud de los resultados. Las pruebas aseguran que las funciones manejen correctamente los casos límite y los posibles errores. A continuación se detallan las pruebas realizadas:

1. **Top 10 Fechas con Más Tweets**:
    - **Pruebas Básicas**: Verificar que las funciones `q1_time` y `q1_memory` identifiquen correctamente las fechas con más tweets y el usuario más activo en esas fechas.
    - **Casos Límite**: Verificar el comportamiento de las funciones cuando hay tweets con fechas faltantes o formatos de fecha incorrectos, manejando estos casos con excepciones y retornando resultados consistentes.

2. **Top 10 Emojis Más Usados**:
    - **Pruebas Básicas**: Verificar que las funciones `q2_time` y `q2_memory` identifiquen correctamente los emojis más usados y su conteo.
    - **Casos Límite**: Verificar el comportamiento de las funciones cuando hay tweets con contenido faltante, asegurando que los emojis se cuenten correctamente y que los casos con contenido faltante sean manejados sin causar errores.

3. **Top 10 Usuarios Más Mencionados**:
    - **Pruebas Básicas**: Verificar que las funciones `q3_time` y `q3_memory` identifiquen correctamente los usuarios más mencionados y su conteo.
    - **Casos Límite**: Verificar el comportamiento de las funciones cuando hay tweets con contenido faltante o sin menciones, asegurando que los usuarios se cuenten correctamente y que los casos con contenido faltante sean manejados adecuadamente.

Las pruebas unitarias fueron diseñadas para cubrir diferentes escenarios, incluyendo datos válidos y casos límite, para asegurar que las funciones desarrolladas son medianamente fiables.


In [5]:
import pandas as pd
df = pd.read_json(file_path, lines=True)
df.head(3)

Unnamed: 0,url,date,content,renderedContent,id,user,outlinks,tcooutlinks,replyCount,retweetCount,...,quoteCount,conversationId,lang,source,sourceUrl,sourceLabel,media,retweetedTweet,quotedTweet,mentionedUsers
0,https://twitter.com/ArjunSinghPanam/status/136...,2021-02-24 09:23:35+00:00,The world progresses while the Indian police a...,The world progresses while the Indian police a...,1364506249291784198,"{'username': 'ArjunSinghPanam', 'displayname':...",[https://twitter.com/ravisinghka/status/136415...,[https://t.co/es3kn0IQAF],0,0,...,0,1364506249291784198,en,"<a href=""http://twitter.com/download/iphone"" r...",http://twitter.com/download/iphone,Twitter for iPhone,,,{'url': 'https://twitter.com/RaviSinghKA/statu...,"[{'username': 'narendramodi', 'displayname': '..."
1,https://twitter.com/PrdeepNain/status/13645062...,2021-02-24 09:23:32+00:00,#FarmersProtest \n#ModiIgnoringFarmersDeaths \...,#FarmersProtest \n#ModiIgnoringFarmersDeaths \...,1364506237451313155,"{'username': 'PrdeepNain', 'displayname': 'Pra...",[],[],0,0,...,0,1364506237451313155,en,"<a href=""http://twitter.com/download/android"" ...",http://twitter.com/download/android,Twitter for Android,[{'thumbnailUrl': 'https://pbs.twimg.com/ext_t...,,,"[{'username': 'Kisanektamorcha', 'displayname'..."
2,https://twitter.com/parmarmaninder/status/1364...,2021-02-24 09:23:22+00:00,ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾਂ ਨੂੰ ਮੱਦੇਨਜ਼ਰ ਰੱਖਦੇ ਹੋਏ \nਮੇ...,ਪੈਟਰੋਲ ਦੀਆਂ ਕੀਮਤਾਂ ਨੂੰ ਮੱਦੇਨਜ਼ਰ ਰੱਖਦੇ ਹੋਏ \nਮੇ...,1364506195453767680,"{'username': 'parmarmaninder', 'displayname': ...",[],[],0,0,...,0,1364506195453767680,pa,"<a href=""http://twitter.com/download/android"" ...",http://twitter.com/download/android,Twitter for Android,,,,


### Descripción de las Columnas del DataFrame

1. **url**: La URL del tweet.
2. **date**: La fecha y hora en que se creó el tweet, en formato UTC.
3. **content**: El texto del tweet.
4. **renderedContent**: El contenido renderizado del tweet (puede ser igual a `content`).
5. **id**: El identificador único del tweet.
6. **user**: Informacion sobre usuario que publicó el tweet.
7. **outlinks**: Enlaces externos incluidos en el tweet.
8. **tcooutlinks**: Enlaces acortados de Twitter incluidos en el tweet.
9. **replyCount**: Número de respuestas al tweet.
10. **retweetCount**: Número de retweets del tweet.
11. **likeCount**: Número de "me gusta" que recibió el tweet.
12. **quoteCount**: Número de veces que el tweet fue citado.
13. **conversationId**: Identificador de la conversación a la que pertenece el tweet.
14. **lang**: El idioma del tweet, en formato de código BCP 47.
15. **source**: La fuente desde donde se publicó el tweet (por ejemplo, "Twitter Web Client").
16. **sourceUrl**: URL de la fuente desde donde se publicó el tweet.
17. **sourceLabel**: Etiqueta de la fuente desde donde se publicó el tweet.
18. **media**: Información sobre los medios incluidos en el tweet (imágenes, videos, etc.).
19. **retweetedTweet**: Información sobre el tweet original si este es un retweet.
20. **quotedTweet**: Información sobre el tweet citado.
21. **mentionedUsers**: Usuarios mencionados en el tweet.


## Enfoque Local
---

Las funciones `qx_time` y `qx_memory` fueron optimizadas para tiempo y memoria respectivamente. Para `q_time`, se utilizó `defaultdict` para contar tweets por fecha y usuario, procesando el archivo JSON línea por línea con `open` y `json.loads`, lo que permitió una rápida conversión de cada línea en un diccionario de Python. Los resultados se calcularon ordenando y agregando eficientemente los datos. Para `q_memory`, se implementaron generadores (`tweet_generator`) que leen el archivo línea por línea, reduciendo significativamente el uso de memoria. Al igual que en `q_time`, `defaultdict` se utilizó para llevar un conteo incremental de tweets, emojis o menciones, evitando cargar todo el archivo en memoria y optimizando así el rendimiento en términos de consumo de recursos. Aunque la librería pandas podría ser eficiente en algunos casos, se optó por estos enfoques para manejar eficientemente grandes volúmenes de datos.

### Las Top 10 Fechas con Más Tweets y el Usuario Más Activo en Cada Fecha

| Función         | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|-----------------|--------------------------|----------------------|
| `q1_time`       | 2.96                     | 1.69                 |
| `q1_memory`     | 2.83                     | 0.05                 |

### Los Top 10 Emojis Más Usados con su Respectivo Conteo

| Función         | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|-----------------|--------------------------|----------------------|
| `q2_time`       | 3.20                     | 1.23                 |
| `q2_memory`     | 3.24                     | 0.14                 |

### Top 10 Usuarios Más Mencionados

| Función         | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|-----------------|--------------------------|----------------------|
| `q3_time`       | 2.65                     | 61.86                |
| `q3_memory`     | 2.64                     | 0.27                 |



NOTA: Se se ejecuta nuevamente las celdas puede cambiar brevemente el resultado. Se puede apreciar que qx_memory se ha logrado optimizar existosamente para consumo de momoria, en los qx_time los resultaods no son significativamente diferetes en terminos de tiempo que las funciones qx_memory

### Librerías

In [10]:
# Importando librerías necesarias
import pandas as pd
import time
from memory_profiler import profile
from memory_profiler import memory_usage

### <span style="color:red">Las Top 10 Fechas con Más Tweets y el Usuario Más Activo en Cada Fecha</span>


In [12]:
from typing import List, Tuple
from datetime import datetime
from collections import defaultdict
import json

def q1_time(file_path: str) -> List[Tuple[datetime.date, str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de las
    10 fechas con más tweets y el nombre del usuario que más publicó en cada una de esas fechas,
    optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[datetime.date, str]]: Una lista de tuplas donde cada tupla contiene una fecha (datetime.date)
    y el nombre del usuario que más publicó en esa fecha.
    """
    try:
        # Estructura para contar los tweets por fecha y por usuario
        date_counts = defaultdict(lambda: defaultdict(int))

        # Procesar el archivo línea por línea para minimizar el uso de memoria
        with open(file_path, 'r') as file:
            for line in file:
                tweet = json.loads(line) # Convertir a diccionario de Python
                try:
                    # Extraer y validar la fecha del tweet
                    date_str = tweet.get('date', None)
                    if date_str:
                        date = datetime.strptime(date_str[:10], '%Y-%m-%d').date() #funcion problematica
                    else:
                        continue  # Saltar si no hay fecha

                    # Extraer y validar el usuario del tweet
                    user = tweet['user']['username'] if tweet['user'] and 'username' in tweet['user'] else None
                    if user:
                        date_counts[date][user] += 1  # Incrementar el conteo para la fecha y usuario correspondiente

                except (KeyError, ValueError) as e:
                    print(f"[WARNING] Error procesando el tweet: {e}")
                    continue  # Saltar en caso de error de clave o valor

        # Obtener las 10 fechas con más tweets
        top_dates = sorted(date_counts.items(), key=lambda x: sum(x[1].values()), reverse=True)[:10] #x[1] conteos por usuario

        # Crear una lista de tuplas con las fechas y el nombre de usuario más activo en cada fecha
        result = [(date, max(users.items(), key=lambda x: x[1])[0]) for date, users in top_dates] #sum(users.values())

        print(f"[INFO] Procesamiento completado exitosamente")
        return result

    except FileNotFoundError:
        print(f"[ERROR] El archivo {file_path} no existe.")
        return []



# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q1_time, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q1_time: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q1_time: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")
print(q1_time(file_path))

[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q1_time: 2.96 segundos
[INFO] Uso de memoria de q1_time: 1.69 MiB
[INFO] Procesamiento completado exitosamente
[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


In [16]:
from typing import List, Tuple
from datetime import datetime
from collections import defaultdict
import json

def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de las
    10 fechas con más tweets y el nombre del usuario que más publicó en cada una de esas fechas,
    optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[datetime.date, str]]: Una lista de tuplas donde cada tupla contiene una fecha (datetime.date)
    y el nombre del usuario que más publicó en esa fecha.
    """
    # Estructura para contar los tweets por fecha y por usuario
    date_counts = defaultdict(lambda: defaultdict(int))

    # Función generadora para procesar el archivo línea por línea
    def tweet_generator(file_path):
        try:
            with open(file_path, 'r') as file:
                for line in file:
                    yield json.loads(line)
        except FileNotFoundError:
            print(f"[ERROR] El archivo {file_path} no existe.")
            raise

    # Usar el generador para procesar cada tweet
    for tweet in tweet_generator(file_path):
        try:
            # Extraer y validar la fecha del tweet
            date_str = tweet.get('date', None)
            if date_str:
                date = datetime.strptime(date_str[:10], '%Y-%m-%d').date()
            else:
                continue  # Saltar si no hay fecha

            # Extraer y validar el usuario del tweet
            user = tweet['user']['username'] if tweet['user'] and 'username' in tweet['user'] else None
            if user:
                date_counts[date][user] += 1  # Incrementar el conteo para la fecha y usuario correspondiente

        except (KeyError, ValueError) as e:
            print(f"[WARNING] Error procesando el tweet: {e}")
            continue  # Saltar en caso de error de clave o valor

    # Obtener las 10 fechas con más tweets
    top_dates = sorted(date_counts.items(), key=lambda x: sum(x[1].values()), reverse=True)[:10]

    # Crear una lista de tuplas con las fechas y el nombre de usuario más activo en cada fecha
    result = [(date, max(users.items(), key=lambda x: x[1])[0]) for date, users in top_dates]

    print(f"[INFO] Procesamiento completado exitosamente")
    return result


# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q1_memory, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q1_memory: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q1_memory: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")
print(q1_memory(file_path))


[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q1_memory: 2.83 segundos
[INFO] Uso de memoria de q1_memory: 0.05 MiB
[INFO] Procesamiento completado exitosamente
[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


### <span style="color:red">Los Top 10 Emojis Más Usados con su Respectivo Conteo</span>


In [18]:
from typing import List, Tuple
from collections import defaultdict
import json
import emoji

def extract_emojis_from_text(text: str) -> List[str]:
    return [c for c in text if c in emoji.EMOJI_DATA]

def q2_time(file_path: str) -> List[Tuple[str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de los
    10 emojis más usados con su respectivo conteo, optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[str, int]]: Una lista de tuplas donde cada tupla contiene un emoji (str) y su conteo (int).
    """
    try:
        print(f"[INFO] Iniciando procesamiento del archivo: {file_path}")

        # Estructura para contar los emojis
        emoji_counts = defaultdict(int)

        # Procesar el archivo línea por línea para minimizar el uso de memoria
        with open(file_path, 'r') as file:
            for line in file:
                try:
                    tweet = json.loads(line)  # Convertir la línea en un diccionario de Python
                    content = tweet.get('content', '')  # Obtener el contenido del tweet, por defecto vacío si no existe
                    if content is not None:
                        # Extraer emojis del contenido del tweet
                        emojis_in_tweet = extract_emojis_from_text(content)
                        # Contar la frecuencia de cada emoji
                        for em in emojis_in_tweet:
                            emoji_counts[em] += 1
                except (json.JSONDecodeError, KeyError, ValueError) as e:
                    # Captura y registra cualquier error que ocurra durante el procesamiento del tweet
                    print(f"[WARNING] Error procesando el tweet: {e}")
                    continue

        # Obtener los 10 emojis más usados
        top_emojis = sorted(emoji_counts.items(), key=lambda x: x[1], reverse=True)[:10]

        print(f"[INFO] Procesamiento completado exitosamente")
        return top_emojis

    except FileNotFoundError:
        # Manejar el caso en que el archivo no exista
        print(f"[ERROR] El archivo {file_path} no existe.")
        return []

# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q2_time, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q2_time: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q2_time: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")

# Llamar a la función una vez más para obtener el resultado
result = q2_time(file_path)
print(result)


[INFO] Iniciando procesamiento del archivo: ../data/farmers-protest-tweets-2021-2-4.json
[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q2_time: 3.20 segundos
[INFO] Uso de memoria de q2_time: 1.23 MiB
[INFO] Iniciando procesamiento del archivo: ../data/farmers-protest-tweets-2021-2-4.json
[INFO] Procesamiento completado exitosamente
[('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]


In [20]:
from typing import List, Tuple
from collections import defaultdict
import json
import emoji

def extract_emojis_from_text(text):
    return [c for c in text if c in emoji.EMOJI_DATA]

def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de los
    10 emojis más usados con su respectivo conteo, optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[str, int]]: Una lista de tuplas donde cada tupla contiene un emoji (str) y su conteo (int).
    """

    # Estructura para contar los emojis
    emoji_counts = defaultdict(int)

    # Función generadora para procesar el archivo línea por línea
    def tweet_generator(file_path): #iterador
        try:
            with open(file_path, 'r') as file:
                for line in file:
                    yield json.loads(line)

        except FileNotFoundError:
            print(f"[ERROR] El archivo {file_path} no existe.")
            raise
    # Usar el generador para procesar cada tweet
    for tweet in tweet_generator(file_path):
        try:
            content = tweet.get('content', '')
            if content is not None:
                emojis_in_tweet = extract_emojis_from_text(content)
                for em in emojis_in_tweet:
                    emoji_counts[em] += 1
        except KeyError as e:
            print(f"[WARNING] Error procesando el tweet: {e}")

    # Obtener los 10 emojis más usados
    top_emojis = sorted(emoji_counts.items(), key=lambda x: x[1], reverse=True)[:10]

    print(f"[INFO] Procesamiento completado exitosamente")
    return top_emojis


# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q2_memory, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q2_memory: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q2_memory: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")

# Llamar a la función una vez más para obtener el resultado
result = q2_memory(file_path)
print(result)


[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q2_memory: 3.24 segundos
[INFO] Uso de memoria de q2_memory: 0.14 MiB
[INFO] Procesamiento completado exitosamente
[('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218), ('👇', 1108)]


### <span style="color:red">Top 10 Histórico de Usuarios Más Influyentes en Función del Conteo de las Menciones (@)</span>

In [22]:
from typing import List, Tuple
import re
from collections import defaultdict
import json

def extract_mentions(s: str) -> List[str]:
    return re.findall(r'@\w+', s)

def q3_time(file_path: str) -> List[Tuple[str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de los
    10 usuarios más mencionados con su respectivo conteo, optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[str, int]]: Una lista de tuplas donde cada tupla contiene un usuario (str) y su conteo de menciones (int).
    """
    try:
        print(f"[INFO] Iniciando procesamiento del archivo: {file_path}")

        # Estructura para contar las menciones
        mention_counts = defaultdict(int)

        # Procesar el archivo línea por línea para minimizar el uso de memoria
        with open(file_path, 'r') as file:
            for line in file:
                try:
                    tweet = json.loads(line)  # Convertir la línea en un diccionario de Python
                    content = tweet.get('content', '')  # Obtener el contenido del tweet, por defecto vacío si no existe
                    if content is not None:
                        # Extraer menciones del contenido del tweet
                        mentions_in_tweet = extract_mentions(content)
                        # Contar la frecuencia de cada mención
                        for mention in mentions_in_tweet:
                            mention_counts[mention] += 1
                except (json.JSONDecodeError, KeyError, ValueError) as e:
                    # Capturar y registrar cualquier error que ocurra durante el procesamiento del tweet
                    print(f"[WARNING] Error procesando el tweet: {e}")
                    continue

        # Obtener los 10 usuarios más mencionados
        top_mentions = sorted(mention_counts.items(), key=lambda x: x[1], reverse=True)[:10]

        print(f"[INFO] Procesamiento completado exitosamente")
        return top_mentions

    except FileNotFoundError:
        # Manejar el caso en que el archivo no exista
        print(f"[ERROR] El archivo {file_path} no existe.")
        return []



# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q3_time, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q3_time: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q3_time: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")

# Llamar a la función una vez más para obtener el resultado
result = q3_time(file_path)
print(result)


[INFO] Iniciando procesamiento del archivo: ../data/farmers-protest-tweets-2021-2-4.json
[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q3_time: 2.65 segundos
[INFO] Uso de memoria de q3_time: 61.86 MiB
[INFO] Iniciando procesamiento del archivo: ../data/farmers-protest-tweets-2021-2-4.json
[INFO] Procesamiento completado exitosamente
[('@narendramodi', 2261), ('@Kisanektamorcha', 1836), ('@RakeshTikaitBKU', 1639), ('@PMOIndia', 1422), ('@RahulGandhi', 1125), ('@GretaThunberg', 1046), ('@RaviSinghKA', 1015), ('@rihanna', 972), ('@UNHumanRights', 962), ('@meenaharris', 925)]


In [24]:
from typing import List, Tuple
import re
from collections import defaultdict
import json

def extract_mentions(s):
    return re.findall(r'@\w+', s)

def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de los
    10 usuarios más mencionados con su respectivo conteo, optimizando para el uso de memoria.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[str, int]]: Una lista de tuplas donde cada tupla contiene un usuario (str) y su conteo de menciones (int).
    """
    # Estructura para contar las menciones
    mention_counts = defaultdict(int)

    # Función generadora para procesar el archivo línea por línea
    def tweet_generator(file_path): #iterador
        try:
            with open(file_path, 'r') as file:
                for line in file:
                    yield json.loads(line)
        except FileNotFoundError:
            print(f"[ERROR] El archivo {file_path} no existe.")
            raise

    # Usar el generador para procesar cada tweet
    for tweet in tweet_generator(file_path):
        try:
            content = tweet.get('content', '')
            if content is not None:
                mentions_in_tweet = extract_mentions(content)
                for mention in mentions_in_tweet:
                    mention_counts[mention] += 1
        except KeyError as e:
            print(f"[WARNING] Error procesando el tweet: {e}")

    # Obtener los 10 usuarios más mencionados
    top_mentions = sorted(mention_counts.items(), key=lambda x: x[1], reverse=True)[:10]

    print(f"[INFO] Procesamiento completado exitosamente")
    return top_mentions

# Medir tiempo de ejecución
start_time = time.time()
mem_usage_memory = memory_usage((q3_memory, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"[INFO] Tiempo de ejecución de q3_memory: {end_time - start_time:.2f} segundos")
print(f"[INFO] Uso de memoria de q3_memory: {max(mem_usage_memory) - min(mem_usage_memory):.2f} MiB")

# Llamar a la función una vez más para obtener el resultado
result = q3_memory(file_path)
print(result)


[INFO] Procesamiento completado exitosamente
[INFO] Tiempo de ejecución de q3_memory: 2.64 segundos
[INFO] Uso de memoria de q3_memory: 0.27 MiB
[INFO] Procesamiento completado exitosamente
[('@narendramodi', 2261), ('@Kisanektamorcha', 1836), ('@RakeshTikaitBKU', 1639), ('@PMOIndia', 1422), ('@RahulGandhi', 1125), ('@GretaThunberg', 1046), ('@RaviSinghKA', 1015), ('@rihanna', 972), ('@UNHumanRights', 962), ('@meenaharris', 925)]


## Enfoque Distribuido en la Nube
---

Las funciones `qx_time_spark` se crearon utilizando Spark, una tecnología de computación distribuida. Este enfoque es ideal para procesar grandes volúmenes de datos y aprovecha las capacidades de paralelización y escalabilidad de Spark que se va a ver reflejado en tiempos cortos de ejecución. Sin embargo, para conjuntos de datos pequeños, como los 398 MB de este ejercicio, las ventajas pueden no ser tan evidentes debido a la sobrecarga de configurar y gestionar clústeres.

### Las Top 10 Fechas con Más Tweets y el Usuario Más Activo en Cada Fecha

| Función             | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|---------------------|--------------------------|----------------------|
| `q1_time_spark`     | 10.32                    | 9.09                 |

### Los Top 10 Emojis Más Usados con su Respectivo Conteo

| Función             | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|---------------------|--------------------------|----------------------|
| `q2_time_spark`     | 2.40                     | 8.94                 |

### Top 10 Usuarios Más Mencionados

| Función             | Tiempo de Ejecución (s) | Uso de Memoria (MiB) |
|---------------------|--------------------------|----------------------|
| `q3_time_spark`     | 1.44                     | 0.13                 |



## Ventajas de Usar un Enfoque Distribuido
NOTA: Si se ejecutan nuevamente las celdas, los resultados pueden variar ligeramente. El consumo de momorya puede no ser una buena metrica en este caso por la dificulta del trackeo del consumo de momoria en entornos distribuidos.


### <span style="color:red">Las Top 10 Fechas con Más Tweets y el Usuario Más Activo en Cada Fecha</span>

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, to_date,split, udf
from pyspark.sql.types import ArrayType, StringType
from collections import defaultdict
import json
import datetime
from typing import List, Tuple
import time
from memory_profiler import memory_usage

# Inicializar Spark
spark = SparkSession.builder \
    .appName("Twitter Mentions Analysis") \
    .getOrCreate()


def q1_time_spark(file_path: str) -> List[Tuple[datetime.date, str]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de las
    10 fechas con más tweets y el nombre del usuario que más publicó en cada una de esas fechas.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[datetime.date, str]]: Una lista de tuplas donde cada tupla contiene una fecha (datetime.date)
    y el nombre del usuario que más publicó en esa fecha.
    """
    # Cargar el archivo JSON en un DataFrame de Spark
    df = spark.read.json(file_path)

    # Convertir la columna 'date' a date
    df = df.withColumn('date', to_date(col('date')))
    
    # Obtener las 10 fechas con más tweets
    top_dates = df.groupBy('date').count().orderBy('count', ascending=False).limit(10).collect()
    
    # Crear una lista de tuplas con las fechas y el nombre de usuario más activo en cada fecha
    result = []
    for row in top_dates:
        date = row['date']
        top_user = df.filter(col('date') == date).groupBy('user.username').count().orderBy('count', ascending=False).first()['username']
        result.append((date, top_user))
    
    return result

# Medir tiempo de ejecución
start_time = time.time()
mem_usage_time = memory_usage((q1_time_spark, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"Tiempo de ejecución de q1_time_spark: {end_time - start_time} segundos")
print(f"Uso de memoria de q1_time_spark: {max(mem_usage_time) - min(mem_usage_time)} MiB")
print(q1_time_spark(file_path))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/25 14:15:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Tiempo de ejecución de q1_time_spark: 10.31771206855774 segundos
Uso de memoria de q1_time_spark: 9.09375 MiB


24/07/25 14:16:12 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


### <span style="color:red">Los Top 10 Emojis Más Usados con su Respectivo Conteo</span>

In [29]:
import emoji
def extract_emojis_from_text(text):
    return ''.join([c for c in text if c in emoji.EMOJI_DATA])

def q2_time_spark(file_path: str) -> List[Tuple[str, int]]:
    """
    Esta función toma la ruta de un archivo JSON que contiene tweets y devuelve una lista de los
    10 emojis más usados con su respectivo conteo.

    Parámetros:
    file_path (str): La ruta al archivo JSON que contiene los tweets.

    Retorna:
    List[Tuple[str, int]]: Una lista de tuplas donde cada tupla contiene un emoji (str) y su conteo (int).
    """
    # Cargar el archivo JSON en un DataFrame de Spark
    df = spark.read.json(file_path)
    
    # Extraer todos los emojis de los tweets
    extract_emojis_udf = spark.udf.register("extract_emojis_udf", extract_emojis_from_text)
    df = df.withColumn('emojis', extract_emojis_udf(col('content')))
    
    # Explode la columna de emojis
    df = df.withColumn('emoji', explode(split(col('emojis'), '')))
    
    # Contar la frecuencia de cada emoji
    emoji_counts = df.groupBy('emoji').count().orderBy('count', ascending=False).limit(10)
    
    # Crear una lista de tuplas con los emojis y su conteo
    result = [(row['emoji'], row['count']) for row in emoji_counts.collect()]
    
    return result

# Medir tiempo de ejecución
start_time = time.time()
mem_usage_time = memory_usage((q2_time_spark, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"Tiempo de ejecución de q2_time_spark: {end_time - start_time} segundos")
print(f"Uso de memoria de q2_time_spark: {max(mem_usage_time) - min(mem_usage_time)} MiB")
print(q2_time_spark(file_path))


                                                                                

Tiempo de ejecución de q2_time_spark: 2.4027657508850098 segundos
Uso de memoria de q2_time_spark: 8.9375 MiB


24/07/25 14:16:19 WARN SimpleFunctionRegistry: The function extract_emojis_udf replaced a previously registered function.
[Stage 73:>                                                         (0 + 8) / 8]

[('', 100846), ('🙏', 7286), ('😂', 3072), ('🚜', 2972), ('✊', 2411), ('🌾', 2363), ('🏻', 2080), ('❤', 1779), ('🤣', 1668), ('🏽', 1218)]


                                                                                

### <span style="color:red">Top 10 Histórico de Usuarios Más Influyentes en Función del Conteo de las Menciones (@)</span>

In [31]:
# función extract_mentions
import re
def extract_mentions(s):
    return re.findall(r'@\w+', s)

# extract_mentions como UDF
extract_mentions_udf = udf(extract_mentions, ArrayType(StringType()))

def q3_time_spark(file_path: str) -> List[Tuple[str, int]]:
    df = spark.read.json(file_path)
    df = df.withColumn('mentions', explode(extract_mentions_udf(col('content'))))
    mention_counts = df.groupBy('mentions').count().orderBy('count', ascending=False).limit(10)
    result = [(row['mentions'], row['count']) for row in mention_counts.collect()]
    return result


# Medir tiempo de ejecución
start_time = time.time()
mem_usage_time = memory_usage((q3_time_spark, (file_path,)), interval=0.1, timeout=None)
end_time = time.time()

print(f"Tiempo de ejecución de q3_time_spark: {end_time - start_time} segundos")
print(f"Uso de memoria de q3_time_spark: {max(mem_usage_time) - min(mem_usage_time)} MiB")
print(q3_time_spark(file_path))


                                                                                

Tiempo de ejecución de q3_time_spark: 1.4450087547302246 segundos
Uso de memoria de q3_time_spark: 0.125 MiB


                                                                                

[('@narendramodi', 2261), ('@Kisanektamorcha', 1836), ('@RakeshTikaitBKU', 1639), ('@PMOIndia', 1422), ('@RahulGandhi', 1125), ('@GretaThunberg', 1046), ('@RaviSinghKA', 1015), ('@rihanna', 972), ('@UNHumanRights', 962), ('@meenaharris', 925)]


### Ventajas de Usar un Enfoque Distribuido

El enfoque distribuido es ideal para procesar grandes volúmenes de datos. Plataformas como Databricks, AWS Glue, Azure Synapse Analytics y Google BigQuery facilitan la gestión de clústeres de computación distribuida.

1. **Escalabilidad**: Permite añadir nodos al clúster para manejar más datos, asegurando un rendimiento consistente.
2. **Eficiencia de Procesamiento**: Tecnologías como Apache Spark procesan datos en paralelo, reduciendo el tiempo de ejecución. (puro hardware)
3. **Tolerancia a Fallos**: Los sistemas distribuidos reasignan tareas si un nodo falla, garantizando continuidad en el procesamiento.
4. **Flexibilidad y Facilidad de Uso**: Proveen entornos integrados con diversas integraciones y herramientas.

Sin embargo, para conjuntos de datos pequeños, como los 398 MB de este ejercicio, las ventajas pueden no ser evidentes. La sobrecarga de configurar y gestionar clústeres puede superar los beneficios del procesamiento paralelo.

### Consideraciones de Escalabilidad

Para conjuntos de datos más grandes, usar Spark en un clúster distribuido es eficiente y escalable. A medida que el tamaño de los datos crece, el enfoque distribuido maneja la carga adicional sin degradar el rendimiento, algo difícil de lograr con un enfoque local.
