# Challenge Data Engineer LATAM

## Requisitos para ejecutar este notebook

### Datos de trabajo

Para ejecutar este notebook es necesario tener los datos listos, teniendo en cuenta las restricciones de tamaño de GitHub los tweets ha evaluar se han subido en formato .zip.

Se ha creado una función que permite extraer este archivo .zip ejecutando el siguiente comando:

´´´
    python .\src\zip_extraction.py 
´´´

Esto dejará disponible el archivo .json con los tweets a evaluar dentro de la carpeta data y listos para lectura.


### Establecer variable con la ruta de los tweets a evaluar

In [84]:
file_path = "../data/farmers-protest-tweets-2021-2-4.json"

### Imports

In [85]:
import pandas as pd
import json
import time
import datetime
import memory_profiler
import re
from typing import List, Tuple
#import concurrent

from datetime import datetime
from memory_profiler import memory_usage
from collections import defaultdict, Counter
#from concurrent.futures import ProcessPoolExecutor

## Q1 Time and Memory solución

### Primera versión de Q1 Time usando Pandas

La primera aproximación a la solución fue con Pandas, leer el archivo completo y cargarlo en un dataframe, de aquí lo que se hace es agrupar por fecha y luego con .agg contar y ordenar los tweets por fecha y usuario, finalmente se hace un sort para ordenar los datos, se convierte el formato solicitado y se retorna.

In [8]:
def q1_time_pandas(file_path: str):
    # Leer el JSON y cargarlo a un dataframe de pandas
    tweets_df = pd.read_json(file_path, lines=True)

    # Convertir la columna 'date' a datetime
    tweets_df['date'] = pd.to_datetime(tweets_df['date']).dt.date

    # Agrupar por fecha y contar la cantidad de tweets por fecha y usuario
    top_dates_df = tweets_df.groupby('date').agg(
        total_tweets=('date', 'size'),
        most_active_user=('user', lambda x: x.mode()[0]['username'])
    ).sort_values('total_tweets', ascending=False).head(10)

    # Convertir el dataframe a una lista de tuplas
    top_dates = [(row.name, row['most_active_user']) for _, row in top_dates_df.iterrows()]

    return top_dates

Las pruebas de memoria y tiempo estaban dando como resultado entre 33 y 35 segundos de tiempo de ejecución y un consumo maximo de memoria de entre 2950MiB y 3100MiB

In [9]:
#Ejecutar la función y mostrar el resultado
top_dates = q1_time_pandas(file_path)
print(top_dates)

# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_memory = max(memory_usage((q1_time_pandas, (file_path,))))
end_time = time.time()

print(f"q1_memory - Execution Time: {end_time - start_time} seconds")
print(f"q1_memory - Peak Memory Usage: {peak_memory_memory} MiB")

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]
q1_memory - Execution Time: 33.76640963554382 seconds
q1_memory - Peak Memory Usage: 2968.328125 MiB


### Segunda versión de Q1 Time

La modificación clave en esta versión definitiva de Q1 Time es la extracción de los usuarios y fecha, dejando de lado el resto de las columnas antes de realizar el agrupamiento, así mismo el output en tuplas se realiza de una manera diferente convertiendo cada par de fecha, usuario en una tupla y estas luego en una lista.

In [10]:
def q1_time_pandas_revised(file_path: str):
    # Leer el JSON y cargarlo a un dataframe de pandas
    tweets_df = pd.read_json(file_path, lines=True)

    # Convertir la columna 'date' a datetime
    tweets_df['date'] = pd.to_datetime(tweets_df['date']).dt.date

    # Extraer el username que esta anidado en la columna 'user'
    tweets_df['username'] = tweets_df['user'].apply(lambda x: x['username'])

    # Agrupar por fecha y contar la cantidad de tweets por fecha y usuario
    top_dates_df = tweets_df.groupby('date').agg(
        total_tweets=('date', 'size'),
        most_active_user=('username', lambda x: x.mode()[0])
    ).sort_values('total_tweets', ascending=False).head(10).drop(columns=['total_tweets'])

    # Convertir el dataframe a una lista de tuplas
    top_dates = list(top_dates_df.itertuples(index=True, name=None))

    return top_dates

El nuevo tiempo de ejecución con estas modificaciones es de entre 7.5 y 9 segundos manteniendo el uso máximo en memoria.
Una reducción de alrededor del 75% en tiempos de ejecución.

In [11]:
# Ejecutar la función y mostrar el resultado
top_dates = q1_time_pandas_revised(file_path)
print(top_dates)

# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_time = max(memory_usage((q1_time_pandas_revised, (file_path,))))
end_time = time.time()

print(f"q1_time_pandas_revised - Execution Time: {end_time - start_time} seconds")
print(f"q1_time_pandas_revised - Peak Memory Usage: {peak_memory_time} MiB")

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]
q1_time_pandas_revised - Execution Time: 7.851444482803345 seconds
q1_time_pandas_revised - Peak Memory Usage: 2991.9140625 MiB


### Q1 Memory usando una aproximación de lectura linea por linea

La aproximación para Q1 Memory es la de no cargar el archivo en memoria si nó mas bien leer linea por linea, algo similar a como se manejan datos en streaming, con la libreria JSON se puede conseguir el resultado esperado.

Esta aproximación al problema no solo disminuye el gasto maximo en memoria a alrededor de 100MiB si nó que también es mucho más rapido en ejecución, entre 3.7 y 4.2s segundos, esto ya que estamos tratando una serie de datos relativamente pequeños, lo que permite que esta aproximación de optimización de memoria se la solución mas eficiente en tiempo de ejecución.

In [22]:
def q1_memory(file_path: str):
    tweet_counts = Counter()
    user_activity = defaultdict(Counter)

    # Leer el JSON fila por fila
    with open(file_path, 'r') as file:
        for line in file:
            tweet = json.loads(line)
            date = datetime.fromisoformat(tweet['date']).date()
            username = tweet['user']['username']

            # Contar la cantidad de tweets por fecha y usuario
            tweet_counts[date] += 1
            user_activity[date][username] += 1

    # Sacar el top 10 de fechas con mas tweets
    top_dates = tweet_counts.most_common(10)

    # Sacar el usuario mas activo por fecha
    top_dates = [(date, user_activity[date].most_common(1)[0][0]) for date, _ in top_dates]

    return top_dates

Q1 Memory con la lectura linea por linea presenta un tiempo de ejecución de entre 3.7s a 4.2s y un uso máximo de memoria de entre 70MiB y 120MiB

In [23]:
# Ejecutar la función y mostrar el resultado
top_dates = q1_memory(file_path)
print(top_dates)

# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_memory = max(memory_usage((q1_memory, (file_path,))))
end_time = time.time()

print(f"q1_memory - Execution Time: {end_time - start_time} seconds")
print(f"q1_memory - Peak Memory Usage: {peak_memory_memory} MiB")

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]
q1_memory - Execution Time: 3.816939115524292 seconds
q1_memory - Peak Memory Usage: 59.87109375 MiB


## Q2 Time and Memory solución

### Q2 Time con Pandas

Aproximación inicial a Q2 Time con Pandas usando lo aprendido en la Q1 Time, usando REGEX para encontrar los emojis, se carga el JSON, se convierte a df, con REFEX se encuentran los Emojis y luego se ordena para encontrar los mas usados.

In [44]:
def q2_pandas(file_path: str) -> List[Tuple[str, int]]:
    # Definir los emojis que se van a buscar
    emoji_pattern = re.compile('[\U0001F600-\U0001F64F]')

    # Cargar los tweets a un dataframe de pandas
    tweets_df = pd.read_json(file_path, lines=True)

    # Extraer los emojis de la columna 'content'
    tweets_df['emojis'] = tweets_df['content'].apply(lambda x: emoji_pattern.findall(x))

    # Expandir la columna 'emojis' para que cada fila tenga un solo emoji con .explode()
    df_emojis = tweets_df.explode('emojis')

    # Traer los 10 emojis mas usados
    top_emojis = df_emojis['emojis'].value_counts().head(10)

    return top_emojis.reset_index().rename(columns={'index': 'emoji', 'emojis': 'count'})


Q2 Time con Pandas estaba dando en los tests entre 8.5s y 9.5s de tiempo de ejecución, con cerca de 3200MiB de punto maximo de consumo de memoria.

In [46]:
# Ejecutar la función y mostrar el resultado
top_emojis_df = q2_pandas(file_path)
print(top_emojis_df)
# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_memory = max(memory_usage((q2_pandas, (file_path,))))
end_time = time.time()

print(f"q2_pandas - Execution Time: {end_time - start_time} seconds")
print(f"q2_pandas - Peak Memory Usage: {peak_memory_memory} MiB")

   count  count
0      🙏   7286
1      😂   3072
2      😡    378
3      😁    280
4      😊    259
5      😢    225
6      🙄    219
7      🙌    215
8      😀    213
9      😜    202
q2_pandas - Execution Time: 8.545934915542603 seconds
q2_pandas - Peak Memory Usage: 3149.39453125 MiB


### Q2 Time, dejando de usar pandas, lectura del archivo completo con la libreria json

Se buscó otro angulo para Q2 Time buscando reducir el tiempo de ejecución, reemplacé la carga de la información con Pandas a cargar el archivo y leerlo con la liberia de JSON linea a linea, similar a como se hizo en Q1 Memory pero con el archivo cargado desde antes buscando tener un menor teimpo de ejecución, así mismo se usa .findall para encontrar los emojis despues de extraer el contenido del tweet de el apartado de content

In [70]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    emoji_counter = Counter()
    emoji_pattern = re.compile('[\U0001F600-\U0001F64F]')  #REGEX para encontrar emojis

    with open(file_path, 'r') as file:
        data = file.readlines()
        #Carga el json y busca los emojis en cada tweet

    for line in data:
        tweet = json.loads(line)
        content = tweet.get('content', '')
        emojis = emoji_pattern.findall(content)
        emoji_counter.update(emojis)

    top_emojis = emoji_counter.most_common(10)
    return top_emojis

El cambio de angulo a Q2 Time ayudó considerablemente en la reducción en el tiempo de ejecución, ahora entre 3.7s y 4s y adicional con una considerable reducción en el pico mas alto de memoria, teniendo como promedio 600MiB versus los 3000MiB de la solución con Pandas.

In [81]:
# Ejecutar la función y mostrar el resultado
top_emojis = q2_time(file_path)
print(top_emojis)
# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_memory = max(memory_usage((q2_time, (file_path,))))
end_time = time.time()

print(f"q2_time - Execution Time: {end_time - start_time} seconds")
print(f"q2_time - Peak Memory Usage: {peak_memory_memory} MiB")

[('🙏', 7286), ('😂', 3072), ('😡', 378), ('😁', 280), ('😊', 259), ('😢', 225), ('🙄', 219), ('🙌', 215), ('😀', 213), ('😜', 202)]
q2_time - Execution Time: 3.7877962589263916 seconds
q2_time - Peak Memory Usage: 605.49609375 MiB


### Q2 Memory evitando cargar el archivo completo, se usa lectura linea por linea para buscar los emojis por cada tweet

Para la solución Q2 Memory tomé un angulo muy similar al de la solución final de Q2 Time pero en vez de cargar el archivo JSON completamente se hace lo mismo que en Q1 Memory leyendo el archivo fila a fila, la logíca base es la misma que en Q2 Time pero este cambio logra reducir el pico de memoria de la función a un promedio 550MiB.

In [77]:
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    emoji_counter = Counter()
    emoji_pattern = re.compile('[\U0001F600-\U0001F64F]')  #REGEX para encontrar emojis

    with open(file_path, 'r') as file:
        for line in file:
            tweet = json.loads(line)
            content = tweet.get('content', '')
            emojis = emoji_pattern.findall(content)
            for emoji in emojis:
                emoji_counter[emoji] += 1

    top_emojis = emoji_counter.most_common(10)
    return top_emojis

Al analizar los tests de eficiencia en tiempo y memoria podemos ver que el tiempo de ejecución es ligeramente mas alto que en Q2 Time, pero se encuentra una reducción en el pico de memoria, de un promedio de 600MiB a 550MiB.

In [83]:
# Ejecutar la función y mostrar el resultado
top_emojis = q2_memory(file_path)
print(top_emojis)
# Tests de eficiencia de tiempo y memoria
start_time = time.time()
peak_memory_memory = max(memory_usage((q2_memory, (file_path,))))
end_time = time.time()

print(f"q2_memory - Execution Time: {end_time - start_time} seconds")
print(f"q2_memory - Peak Memory Usage: {peak_memory_memory} MiB")

[('🙏', 7286), ('😂', 3072), ('😡', 378), ('😁', 280), ('😊', 259), ('😢', 225), ('🙄', 219), ('🙌', 215), ('😀', 213), ('😜', 202)]
q2_memory - Execution Time: 3.8695602416992188 seconds
q2_memory - Peak Memory Usage: 551.01953125 MiB


## Q3 Time and Memory solucion

La solución de Q3 Time y Q3 Memory comparte mucho del desarrollo de Q2, para variar las soluciones elegí irme con Pandas + REGEX para Q3 Time, con REGEX se puede encontrar las menciones a distintos usuarios para luego guardar los usernames en un dataframe, se reutiliza el .explode que ya se habia explorado en Q2 y se ordena por usuarios mas mencionados.

### Q3 Time solucion con Pandas

In [98]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    # Read the entire file into a Pandas DataFrame
    df = pd.read_json(file_path, lines=True)

    # Regular expression to extract mentions
    mention_pattern = re.compile(r'@(\w+)')

    # Extract mentions and explode the DataFrame
    df['mentions'] = df['content'].str.findall(mention_pattern)
    df_exploded = df.explode('mentions')

    # Count and get the top 10 mentions
    top_mentions = df_exploded['mentions'].value_counts().head(10)

    # Convert to list of tuples
    top_mentions_output = list(top_mentions.items())

    return top_mentions_output

El analisis de los tests de Q3 muestra un tiempo promedio de ejecución de 9.5s y un pico de uso de memoria de 3400MiB

In [99]:
#Executing q3_time and printing the result
top_mentions = q3_time(file_path)
print(top_mentions)
# Timing and memory testing for q1_memory
start_time = time.time()
peak_memory_memory = max(memory_usage((q3_time, (file_path,))))
end_time = time.time()

print(f"q3_time - Execution Time: {end_time - start_time} seconds")
print(f"q3_time - Peak Memory Usage: {peak_memory_memory} MiB")

[('narendramodi', 2261), ('Kisanektamorcha', 1836), ('RakeshTikaitBKU', 1639), ('PMOIndia', 1422), ('RahulGandhi', 1125), ('GretaThunberg', 1046), ('RaviSinghKA', 1015), ('rihanna', 972), ('UNHumanRights', 962), ('meenaharris', 925)]
q3_time - Execution Time: 9.334634780883789 seconds
q3_time - Peak Memory Usage: 3332.78125 MiB


### Q3 Memory solucion, usa el metodo de lectura linea por linea del JSON usado anteriormente

Para la solución de Q3 Memory se usa una combinación de REGEX y de lectura fila a fila del JSON de tweets, con una estructura similar a Q2 Memory donde se busca con .findall las menciones dentro de la columna de content, esta solución reduce en un 45% el pico de consumo de memoria y como se mencionó en Q1 ya que la cantidad de tweets procesados no es lo suficientemente alta como para que una lectura linea a linea sea mas lenta que una carga inicial de los datos completos el tiempo de ejecución también se reduce, de 9s en promedio a 4s.

In [93]:
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    mention_counter = Counter()
    mention_pattern = re.compile(r'@(\w+)')

    # Process file line by line
    with open(file_path, 'r') as file:
        for line in file:
            tweet = json.loads(line)
            mentions = mention_pattern.findall(tweet.get('content', ''))
            for mention in mentions:
                mention_counter[mention] += 1

    # Get the top 10 mentions
    top_mentions = mention_counter.most_common(10)

    return top_mentions

In [95]:
#Executing q3_memory and printing the result
top_mentions = q3_memory(file_path)
print(top_mentions)
# Timing and memory testing for q1_memory
start_time = time.time()
peak_memory_memory = max(memory_usage((q3_memory, (file_path,))))
end_time = time.time()

print(f"q3_memory - Execution Time: {end_time - start_time} seconds")
print(f"q3_memory - Peak Memory Usage: {peak_memory_memory} MiB")

[('narendramodi', 2261), ('Kisanektamorcha', 1836), ('RakeshTikaitBKU', 1639), ('PMOIndia', 1422), ('RahulGandhi', 1125), ('GretaThunberg', 1046), ('RaviSinghKA', 1015), ('rihanna', 972), ('UNHumanRights', 962), ('meenaharris', 925)]
q3_memory - Execution Time: 3.8050408363342285 seconds
q3_memory - Peak Memory Usage: 1906.46484375 MiB
