# LATAM Challenge Data Engineer

## 1. Descripción del caso

El ejercicio de estudio del desafío consiste en el análisis de datos provenientes de Twitter, actualmente X. El objetivo es rescatar información estadística de los tweets relacionados con las protestas de los granjeros en India. Este análisis están enfocados en dos métricas: la utilización de memoria y el tiempo de cada una de las funciones que buscan responder a las preguntas planteadas.

## 2. Supuestos y consideraciones

Se realizó una revisión inicial de los datos proporcionados para identificar la existencia de tweets duplicados o de aquellos en los que existe más de una mención a la misma persona en el mismo tweet de la siguiente manera:

In [2]:
#Carga inicial del archivo, los archivos siempre estaran en la ruta "../data/<archivo>.json"
file_path = "../data/farmers-protest-tweets-2021-2-4.json"

In [3]:
import polars as pl
import jsonlines

data = []
with jsonlines.open(file_path) as reader:
    for obj in reader:
        users = []
        users_dedup = []
        mentioned_users = obj.get('mentionedUsers', {})
        if mentioned_users:
            users.extend([item['username'] for item in mentioned_users])
            users_dedup.extend(list(set(item['username'] for item in mentioned_users)))
        tweet = {
            'date': obj.get('date'),
            'id': obj.get('id'),
            'username': obj.get('user', {}).get('username'),
            'mentionedUsers': users,
            'users_dedup': users_dedup
        }
        data.append(tweet)
df = pl.DataFrame(data)

In [4]:
id_count = df.group_by('id').len()
id_count = id_count.filter(pl.col('len')>1)
len(id_count)

0

El total de registros duplicados para el dataframe por el id del tweet generado es cero para el archivo de estudio.

La revisión de las múltiples menciones en el mismo tweet se verificó de la siguiente manera:

In [5]:
mensiones_duplicadas = df.filter(pl.col('mentionedUsers').len() != pl.col('users_dedup').len())
len(mensiones_duplicadas)

0

### Supuestos generales

1. Los archivos para su ejecución se encuentran en la ruta: "../data/farmers-protest-tweets-2021-2-4.json"
2. Los emojis con variaciones de tono de piel serán tratados como emojis distintos.
3. Los datos que se extraen desde Twitter son únicos y no tienen duplicados.


## 3. Explicación de las funciones implementadas


### Ejercicio 1
Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.

#### Optimización de tiempo

Para responder a este requerimiento se optó por utilizar la librería *polars* de python, con la cual se busca cargar la información a procesar en memoria para posteriormente procesarla según las necesidades del caso.

```Python
with jsonlines.open(file_path) as reader:
        for obj in reader:
            # Se rescata solo los campos que se analizaran
            filter_obj = {
                'date': obj.get('date'),
                'username': obj.get('user', {}).get('username')
            }
            data.append(filter_obj)
```

De la estructura de cada objeto json se obtiene la fecha y el username de quién realizó el tweet.

Luego se procesa con polars, primero para obtener las fechas con más tweets:

```Python
    top_fechas =( df
                    .group_by('date')
                    .len()
                    .sort('len',descending=True)
                    .head(10)['date']
                )
```

Luego se filtra el dataframe inicial por estas fechas:

```Python
result = (  df_filtrado.group_by(['date', 'username'])
            .len() # Se cuenta para cada usuario por fecha 
            .sort(by=['date', 'len'], descending=[False, True]) # Se ordena por fecha desendente false y largo true
            .group_by('date') # se agrupa por fechas
            .agg(
                    # Se agrega y se rescata le primer usuario por fecha
                    pl.col('username').first()
                )
     )
```

In [6]:
from q1_time import q1_time
result = q1_time(file_path)
print(result)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


#### Optimización de memoria

Para responder a este requerimiento se optó por la lectura secuencial del archivo, por cada linea nueva que se procesa se rescatan las estadísticas necesarias para el cálculo final.

```Python
with jsonlines.open(file_path) as reader:
    for obj in reader:
        # Se obtiene la fecha de cada uno de los registros
        fecha_str = obj.get('date')[:10]
        # Se castea de string a datetime.date
        fecha = datetime.strptime(fecha_str, '%Y-%m-%d').date()
        # Se actualiza el contador de fechas
        fecha_counter[fecha] += 1
        # Se actualiza el contador de usuarios por fechas
        fecha_usuario_counter[fecha][obj.get('user', {}).get('username')] += 1
```

Se utiliza un  `fecha_usuario_counter = defaultdict(Counter)` para almacenar los totales por cada fecha y usuario.

Luego se itera por cada una de las fechas.

```Python
for fecha in top_fechas:
    usuarios = fecha_usuario_counter[fecha[0]]
    usuario_mas_repetido = usuarios.most_common(1)[0][0]
    result.append((fecha[0], usuario_mas_repetido))
```

In [7]:
from q1_memory import q1_memory
result = q1_memory(file_path)
print(result)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


### Comparación de tiempo y uso de memoria para cada una de las funciones
#### Tiempo

In [8]:
import cProfile
import pstats
from io import StringIO
from q1_time import q1_time
from q1_memory import q1_memory

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q1_time(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()

# Obtener el tottime final
tiempo_total_q1_time = stats.total_tt
tiempo_total_q1_time

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q1_memory(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()
# Obtener el tottime final
tiempo_total_q1_memory = stats.total_tt
tiempo_total_q1_memory


3.199416044

In [9]:
print(f'La función que prioriza el tiempo tiene un consumo de: {tiempo_total_q1_time:.4f} segundos, \nmientras que la función que prioriza el uso de memoria tiene un consumo de: {tiempo_total_q1_memory:.4f} segundos.')
diferencia_porcentual = (abs(tiempo_total_q1_time - tiempo_total_q1_memory)/((tiempo_total_q1_time + tiempo_total_q1_memory)/2)) * 100
print(f'Lo que representa una mejora de {diferencia_porcentual:.4f}%.')

La función que prioriza el tiempo tiene un consumo de: 2.4117 segundos, 
mientras que la función que prioriza el uso de memoria tiene un consumo de: 3.1994 segundos.
Lo que representa una mejora de 28.0789%.


#### Memoria

In [10]:
%load_ext memory_profiler
%mprun -f q1_memory q1_memory(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8    326.0 MiB    326.0 MiB           1   def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
     9                                             """
    10                                             Funcion que lee un archivo JSONL y encuentra las 10 fechas con mayor 
    11                                             cantidad de tweets, para cada una de estas fechas entrega el usuario 
    12                                             con mayor cantidad de tweets.
    13                                         
    14                                             Args:
    15                                                 file_path (str): Ruta del archivo JSONL que contiene los tweets para 
    16                                                 analizar.
    17                                         
    18    

Total de memoria utilizada  326.6 MiB.

In [11]:
%mprun -f q1_time q1_time(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8    326.6 MiB    326.6 MiB           1   def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
     9                                             """
    10                                             Funcion que lee un archivo JSONL y encuentra las 10 fechas con mayor 
    11                                             cantidad de tweets, para cada una de estas fechas entrega el usuario con 
    12                                             mayor cantidad de tweets, se prioriza el tiempo de ejecucion.
    13                                             Para esto se carga toda la informacion en memoria antes de procesarla.
    14                                         
    15                                             Args:
    16                                                 file_path (str): Ruta del archivo JSONL

Total de memoria utilizada  368.4 MiB.

### Ejercicio 2
Los top 10 emojis más usados con su respectivo conteo.

#### Optimización de tiempo

Para responder a este requerimiento se optó por utilizar la librería *pandas* de python, a diferencia del resto de implementaciones se cargó todo el archivo en memoria para luego procesarlo.

`df = pd.read_json(file_path, lines=True)`

Luego utilizando la librería *emot* se utiliza su implementación para la lectura con multiprocesador en la siguiente línea:

`emojis = emot_obj.bulk_emoji(data)`

Para luego continuar con el procesamiento y terminar contando con *Counter*.

Como mejoras a este ejercicio se propone identificar emojis con cambio de tono de piel como un solo emoji, por ejemplo: 👍🏽 y 👍, sean tratados como un solo emoji y no dos como está en la implementación actual.


In [12]:
from q2_time import q2_time
result = q2_time(file_path)
print(result)

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('❤', 1779), ('🤣', 1668), ('✊', 1651), ('🙏🏻', 1317), ('💚', 1040)]


#### Optimización de memoria

Para responder a este requerimiento se optó por la lectura secuencial del archivo utilizando la librería *jsonlines*.

`with jsonlines.open(file_path) as reader`

Por cada una de las lineas procesadas se utiliza el método *emoji* que retorna un diccionario con los emojis y su ubicación en el texto.

`emojis = emot_obj.emoji(content)`

Ejemplo de salida:

` {'value': ['☮', '🙂', '❤'], 'location': [[14, 15], [16, 17], [18, 19]], 'mean': [':peace_symbol:',':slightly_smiling_face:', ':red_heart:'], 'flag': True}` 

De este resultado se utilza 'value' el que se agrega al contador:

`emojis = [item for item in emojis['value']]`

`contador.update(emojis)`

In [13]:
from q2_memory import q2_memory
result = q2_memory(file_path)
print(result)

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('❤', 1779), ('🤣', 1668), ('✊', 1651), ('🙏🏻', 1317), ('💚', 1040)]


### Comparación de tiempo y uso de memoria para cada una de las funciones
#### Tiempo

In [14]:
import cProfile
import pstats
from io import StringIO
from q2_time import q2_time
from q2_memory import q2_memory

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q2_time(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()

# Obtener el tottime final
tiempo_total_q2_time = stats.total_tt

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q2_memory(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()
# Obtener el tottime final
tiempo_total_q2_memory = stats.total_tt



In [15]:
print(f'La función que prioriza el tiempo tiene un consumo de: {tiempo_total_q2_time:.4f} segundos, \nmientras que la función que prioriza el uso de memoria tiene un consumo de: {tiempo_total_q2_memory:.4f} segundos.')
diferencia_porcentual = (abs(tiempo_total_q2_time - tiempo_total_q2_memory)/((tiempo_total_q2_time + tiempo_total_q2_memory)/2)) * 100
print(f'Lo que representa una mejora de {diferencia_porcentual:.4f}%.')

La función que prioriza el tiempo tiene un consumo de: 8.3032 segundos, 
mientras que la función que prioriza el uso de memoria tiene un consumo de: 19.6812 segundos.
Lo que representa una mejora de 81.3168%.


#### Memoria

In [16]:
%mprun -f q2_memory q2_memory(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8    532.2 MiB    532.2 MiB           1   def q2_memory(file_path: str) -> List[Tuple[str, int]]:
     9                                             """
    10                                             Funcion que lee un archivo JSONL y encuentra los 10 emojis mas usados, 
    11                                             priorizando el uso de memoria.
    12                                         
    13                                             Args:
    14                                                 file_path (str): Ruta del archivo JSONL que contiene los tweets para
    15                                                 analizar.
    16                                         
    17                                             Returns:
    18                                                 List[Tuple[str, int]]: L

Total de memoria utilizada  532.0 MiB.

In [17]:
%mprun -f q2_time q2_time(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q2_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     9    532.0 MiB    532.0 MiB           1   def q2_time(file_path: str) -> List[Tuple[str, int]]:
    10                                             """
    11                                             Funcion que lee un archivo JSONL y encuentra los 10 emojis mas usados, 
    12                                             priorizando el uso de memoria.
    13                                         
    14                                             Args:
    15                                                 file_path (str): Ruta del archivo JSONL que contiene los tweets 
    16                                                 para analizar.
    17                                         
    18                                             Returns:
    19                                                 List[Tuple[str, int]]: List

Total de memoria utilizada  2009.3 MiB. 

### Ejercicio 3
El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.

#### Optimización de tiempo

Para responder a este requerimiento se probaron varias formas: para la primera prueba se utilizó la misma aproximación aplicada al ejercicio 1, con la combinación de *polars* y *jsonlines*. Al momento de comparar esta solución con aquella que optimiza memoria, no siempre se obtuvieron mejores resultados en cuanto a tiempo.

La segund aproximación para este ejercició se realizó con el uso de *pandas* y cargar el archivo completo en memoria para procesar la columna que tiene los usuarios mencionados. Esta solución entregó peores tiempos en comparación con la primera propuesta de solución, por lo que se descartó.

La opción por la que finalmente se optó fue una mezcla entre la solución implementada para la optimización de memoria, con la diferencia que en esta se cargan todos los usuarios mencionados y luego se actualiza un contador.

```Python
with jsonlines.open(file_path) as reader:
        for obj in reader:
            # Se rescata solo el campo mentionedUsers
            mentioned_users = obj.get('mentionedUsers', {})
            # Si se tienen mensiones en el tweet
            if mentioned_users:
                # Se extraen todos los usuarios mensionados
                data.extend([item['username'] for item in mentioned_users])
    # Se actualiza el contador
    contador.update(data)
```


In [18]:
from q3_time import q3_time
result = q3_time(file_path)
print(result)

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


#### Optimización de memoria

Para responder a este requerimiento se optó por la lectura secuencial del archivo, al igual que en la implementación del ejercicio 1 por cada línea nueva que se procesa se rescatan las línea necesarias para el cálculo final.


In [19]:
from q3_memory import q3_memory
result = q3_memory(file_path)
print(result)

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


### Comparación de tiempo y uso de memoria para cada una de las funciones
#### Tiempo

In [20]:
import cProfile
import pstats
from io import StringIO
from q3_time import q3_time
from q3_memory import q3_memory

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q3_time(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()

# Obtener el tottime final
tiempo_total_q3_time = stats.total_tt

output_stream = StringIO()
profiler = cProfile.Profile()
profiler.enable()
q3_memory(file_path)
profiler.disable()

stats = pstats.Stats(profiler)
stats.stream = output_stream
stats.strip_dirs()
stats.sort_stats('time')
stats.print_stats()
# Obtener el tottime final
tiempo_total_q3_memory = stats.total_tt



In [21]:
print(f'La función que prioriza el tiempo tiene un consumo de: {tiempo_total_q3_time:.4f} segundos, \nmientras que la función que prioriza el uso de memoria tiene un consumo de: {tiempo_total_q3_memory:.4f} segundos.')
diferencia_porcentual = (abs(tiempo_total_q3_time - tiempo_total_q3_memory)/((tiempo_total_q3_time + tiempo_total_q3_memory)/2)) * 100
print(f'Lo que representra una mejora de {diferencia_porcentual:.4f}%.')

La función que prioriza el tiempo tiene un consumo de: 2.3312 segundos, 
mientras que la función que prioriza el uso de memoria tiene un consumo de: 2.3320 segundos.
Lo que representra una mejora de 0.0345%.


#### Memoria

In [22]:
%mprun -f q3_memory q3_memory(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q3_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     7    528.4 MiB    528.4 MiB           1   def q3_memory(file_path: str) -> List[Tuple[str, int]]:
     8                                             """
     9                                             Funcion que lee un archivo JSONL y encuentra los 10 usuarios con mayor
    10                                             numero de menciones.
    11                                             
    12                                             Args:
    13                                                 file_path (str): Ruta del archivo JSONL que contiene los tweets 
    14                                                 para analizar.
    15                                             
    16                                             Returns:
    17                                                 List[Tuple[str, int]]: Lis

Total de memoria utilizada  528.4 MiB.

In [23]:
%mprun -f q3_time q3_time(file_path)




Filename: /Users/hvera/Dev/DE_LATAM_challenge/LATAM-Data-Engineer-Challenge/src/q3_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     7    528.4 MiB    528.4 MiB           1   def q3_time(file_path: str) -> List[Tuple[str, int]]:
     8                                             """
     9                                             Funcion que lee un archivo JSONL y encuentra los 10 usuarios con mayor 
    10                                             numero de menciones, prioriza el tiempo de ejecucion.
    11                                         
    12                                             Args:
    13                                                 file_path (str): Ruta del archivo JSONL que contiene los tweets 
    14                                                 para analizar.
    15                                         
    16                                             Returns:
    17                                                 List

Total de memoria utilizada  528.4 MiB.

## Notas Adicionales 

En pruebas realizadas fuera del contexto del notebook los resultados generales para cada una de las funciones implementadas son los siguientes:
1. Ejercicio 1

| Métrica   | q1_time       | q1_memory      | Diff %   |
|-----------|---------------|--------------- |----------|
| Time      | 2.401 seconds | 3.150 seconds  | 26.986       |
| Memory    | 100.0 MiB     | 26.4 MiB      | 116.456       |


2. Ejercicio 2

| Métrica   | q2_time       | q2_memory      | Diff %|
|-----------|---------------|----------------|---------|
| Time      | 9.249 seconds | 19.624 seconds | 71.866      |
| Memory    | 1660.4 MiB    | 26.2 MiB       | 193.786      |


3. Ejercicio 3

| Métrica   | q3_time       | q3_memory      | Diff %   |
|-----------|---------------|--------------- |----------|
| Time      | 2.257 seconds | 2.621 seconds  | 14.924       |
| Memory    | 29.0 MiB      | 22.1 MiB       | 27.006      |


## Mejoras

Para las funciones que se desarrollaron se podrían realizar las siguientes mejoras:

A. Al momento de cargar el archivo línea por línea, revisar si la línea que se está cargando corresponde a un objeto JSON.

```Python
with jsonlines.open(file_path) as reader:
        for obj in reader:
            # Se rescata solo el campo mentionedUsers
            try:
                mentioned_users = obj.get('mentionedUsers', {})
            # Si se tienen mensiones en el tweet
                if mentioned_users:
                # Se extraen todos los usuarios mensionados
                    data.extend([item['username'] for item in mentioned_users])
            except Exception as e:
                print(f"El archivo presenta lineas con errores")
    # Se actualiza el contador
    contador.update(data)
```

B. Para el ejercicio 2, buscar alternativas con base en la utilización de expresiones regulares. En el desarrollo se realizaron pruebas con expresiones regulares, pero no fueron del todo satisfactorias. Esto ocurrió debido a que las expresiones regulares que se utilizaron no consideraban emojis compuestos (Zero Width Joiner). En relación a los tiempos de ejecución, las expresiones regulares presentaron un mejor desempeño. Otra mejora adicional corresponde a la limpieza previa de los tonos de piel para los emojis, de esta forma se logrará un conteo más preciso.

C. Para las funciones que priorizan el tiempo de ejecución, queda pendiente realizar las pruebas con la carga completa del archivo por medio de *pandas*.

D. Revisar opciones de paralelización al momento de la lectura de los datos iniciales. De la misma forma en que la utilización de *emot* demostró que la paralelización disminuyó los tiempos en el análisis de los emojis, es una opción buscar alguna forma similar para la lectura del archivo.

E. Buscar una solución en *GCP*, mediante la utilización de *Cloud Storage* para el almacenamiento del archivo. A través de una *Cloud Function* se podría desencadenar un *pipeline*, ya sea mediante un trabajo en *Dataflow* para realizar el análisis de cada una de las preguntas, o utilizando *Cloud Run* para el procesamiento. Es importante tener en cuenta la volumetría de los datos para casos productivos y los costos asociados a la utilización de la infraestructura.

## Configuración del Entorno de Pruebas

### Hardware
- **Procesador:** Apple M2 Pro
- **Memoria RAM:** 16 GB DDR4
- **Almacenamiento:** SSD 512 GB

### Software
- **Sistema Operativo:** macOS Sonoma 14.5 (23F79)
- **Versión de Python:** 3.9.9
- **Librerías:** 
  - memory-profiler 0.61.0
  - jsonlines 4.0.0
  - polars 1.0.0
  - emot 3.1
  - pandas 2.2.2
- **Entorno de Ejecución:** Entorno virtual creado con `venv`