# Data Engineer Challenge - Miguel Guerrero

obs: Se importan las librerias dentro del segmento de codigo de cada solucion con el objetivo que sea utilizado unicamente ahi a pesar que puede ser reutilizado en otro segmento
esto con el objetivo de aportar claridad a las librerias necesarias para el funcionamiento de la solucion

- El archivo json se tiene en el directorio raiz de este archivo.

# 1 Las top 10 fechas donde hay más tweets. 
Mencionar el usuario (username) que más publicaciones tiene por cada uno de esos días.
- Enfoque: Tiempo de ejecucion optimizado

Para este caso se convierte el archivo json a un dataframe de pandas por los beneficios que este tiene en la optimizacion de las operaciones con datos.
Como se esta priorizando el tiempo de ejecucion por sobre de la memoria. Ademas luego de leer el JSON, se guarda el DF en formato Parquet para uso futuro.

In [40]:
import os
import pandas as pd
from datetime import datetime
from typing import List, Tuple

#Se define la funcion que nos devolvera la top 10 fechas con mas tweets y se menciona el usuario con mas tweets en estas fechas con enfoque en optimizacion del tiempo de ejecucion
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    
    #Se ajusta ruta de lectura de archivo json a .parquet para futuras lecturas
    parquet_file = file_path.replace('.json', '.parquet')
    
    #Se verifica si el archivo .parquet existe, si existe se lee, si no se crea
    if os.path.exists(parquet_file):
        df = pd.read_parquet(parquet_file)
    else:
        df = pd.read_json(file_path, lines=True)
        df.to_parquet(parquet_file, index=False)
    

    #Se convierte la columna date a datetime de pandas y extraemos solo la fecha
    df['date'] = pd.to_datetime(df['date']).dt.date
    
    #Se extrae el nombre de usuario de la columna user
    df['username'] = df['user'].apply(lambda x: x.get('username'))
    
    #Se elimina la columna user
    del df['user']

    #Se agrupa el DataFrame por fecha y nombre de usuario y se cuenta el número de tweets para cada combinación
    grouped = df.groupby(['date', 'username']).size().reset_index(name='count')

    #Se obtiene las 10 fechas con el mayor número de tweets.
    top_dates_df = df['date'].value_counts().nlargest(10).reset_index()

    # Se obtiene el usuario con mas tweets para cada una de las 10 fechas
    result = []
    for _, row in top_dates_df.iterrows():
        date = row['date']
        top_user_df = grouped[grouped['date'] == date].nlargest(1, 'count')
        top_user = top_user_df['username'].iloc[0]
        result.append((date, top_user))
    
    return result

#Se ejecuta la funcion
#q1_time('./farmers-protest-tweets-2021-2-4.json')


Enfoque: Memoria en uso optimizada

Para este se utiliza el archivo json y se lee linea por linea.


In [41]:
from datetime import datetime
from typing import List, Tuple
from collections import defaultdict
import ujson as json

#Se define la funcion que nos devolvera la top 10 fechas con mas tweets y se menciona el usuario con mas tweets en estas fechas con enfoque en optimizacion de la memoria en uso
def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    
    # Diccionario para mantener el conteo de tweets por fecha
    date_counts = defaultdict(int)
    # Diccionario para mantener el conteo de tweets de usuario por fecha
    user_date_counts = defaultdict(lambda: defaultdict(int))
    
    with open(file_path, "r") as file:
        for line in file:
            tweet = json.loads(line)
            # Se extrae la date del tweet y la convertimos a datetime.date
            tweet_date = datetime.strptime(tweet['date'].split("T")[0], '%Y-%m-%d').date()
            # Se actualiza el conteo de tweets para esa fecha
            date_counts[tweet_date] += 1
            # Se actualiza el conteo de tweets de usuario para esa fecha
            user_date_counts[tweet_date][tweet['user']['username']] += 1

    # Se ordena las fechas por numero de tweets de forma descendente y se guarda el top 10
    top_dates = sorted(date_counts, key=date_counts.get, reverse=True)[:10]
    # Para cada fecha, se obtiene el usuario con mas tweets
    result = []
    for date in top_dates:
        #Se obtiene el usuario con mas tweets para cada fecha
        top_user = max(user_date_counts[date], key=user_date_counts[date].get)
        result.append((date, top_user))
    
    return result

# Se ejecuta la funcion
#q1_memory('./farmers-protest-tweets-2021-2-4.json')

# 2. Los top 10 emojis más usados con su respectivo conteo.
Enfoque: Tiempo de ejecucion optimizado

In [42]:
import os
import pandas as pd
import emoji
from datetime import datetime
from collections import Counter
from typing import List, Tuple

# Se define la funcion que nos devolvera la lista de los 10 emojis mas usados con enfoque en optimizacion del tiempo de ejecucion
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    
    # Se ajusta ruta de lectura de archivo json a .parquet para futuras lecturas
    parquet_file = file_path.replace('.json', '.parquet')
    
    # Se verifica si el archivo .parquet existe, si existe se lee, si no se crea
    if os.path.exists(parquet_file):
        df = pd.read_parquet(parquet_file)
    else:
        df = pd.read_json(file_path, lines=True)
        df.to_parquet(parquet_file, index=False)
    
    # Se extraen todos los emojis de la columna 'content' y se cuenta su frecuencia
    all_emojis = []
    for content in df['content']:
        emojis_in_content = [entry['emoji'] for entry in emoji.emoji_list(content)]
        all_emojis.extend(emojis_in_content)
    # Se cuenta la frecuencia de cada emoji
    emoji_counts = Counter(all_emojis)

    # Se obteniene los top 10 emojis más utilizados
    top_emojis = emoji_counts.most_common(10)
    
    return top_emojis

#Se ejecuta la funcion
#q2_time('./farmers-protest-tweets-2021-2-4.json')

Enfoque: Memoria en uso optimizada

In [43]:
import ujson as json
from collections import Counter
from typing import List, Tuple
import emoji

#Se define la funcion que nos devolvera la lista de los 10 emojis mas usados con enfoque en optimizacion de la memoria en uso
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    
    # Se crea un counter para contar la frecuencia de cada emoji
    emoji_counts = Counter()
    
    # Se abre el archivo json y se itera sobre cada tweet
    with open(file_path, 'r') as file:
        for line in file:
            # Se convierte la linea en un diccionario
            tweet = json.loads(line)
            # Se extrae el contenido del tweet
            content = tweet.get('content', '')
            # Se extraen los emojis del contenido
            emojis_in_content = [entry['emoji'] for entry in emoji.emoji_list(content)]
                
            # Se actualiza el counter con los emojis encontrados
            emoji_counts.update(emojis_in_content)

    # Se obtienen los top 10 emojis más utilizados
    top_emojis = emoji_counts.most_common(10)
    
    return top_emojis

#Se ejecuta la funcion
#q2_memory("./farmers-protest-tweets-2021-2-4.json")

# 3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos. 
Enfoque: Tiempo de ejecucion optimizado

In [44]:
import os
import pandas as pd
from datetime import datetime
from collections import Counter
from typing import List, Tuple

#Se define la funcion que nos devolvera la lista de los 10 usuarios mas mencionados en tweets con enfoque en la optimizacion del tiempo de ejecucion
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    
    #Se ajusta ruta de lectura de archivo json a .parquet para futuras lecturas
    parquet_file = file_path.replace('.json', '.parquet')
    
    #Se verifica si el archivo .parquet existe, si existe se lee, si no se crea
    if os.path.exists(parquet_file):
        df = pd.read_parquet(parquet_file)
    else:
        df = pd.read_json(file_path, lines=True)
        df.to_parquet(parquet_file, index=False)
    
    
    #Se extraen todas las menciones por cada tweet
    df['mentions'] = df['content'].str.findall(r'@(\w+)')

    # Se aplana la lista de menciones, ya que cada tweet puede tener más de una mención
    mentions_flat = []
    for sublist in df['mentions'].dropna():
        for mention in sublist:
            mentions_flat.append(mention)
    # Se cuenta la frecuencia de cada mención en la lista aplanada
    mention_counts = Counter(mentions_flat)
    # Obtener los top 10 usuarios más mencionados
    top_mentions = mention_counts.most_common(10)
    
    return top_mentions

#Se ejecuta la funcion
#q3_time('./farmers-protest-tweets-2021-2-4.json')

Enfoque: Memoria en uso optimizada

In [45]:
import ujson as json
import re

#Se define la funcion que nos devolvera la lista de los 10 usuarios mas mencionados en tweets con enfoque en la optimizacion de la memoria en uso
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    
    # Se crea un counter para contar la frecuencia de cada mención
    mention_counts = Counter()
    
    # Se abre el archivo json y se itera sobre cada tweet
    with open(file_path, 'r') as file:
        for line in file:
            tweet = json.loads(line)
            content = tweet.get('content', '')
            # Se extraen las menciones del contenido
            mentions = re.findall(r'@(\w+)', content)
            # Se actualiza el counter con las menciones encontradas
            mention_counts.update(mentions)
            
    # Obtener los top 10 usuarios más mencionados
    top_mentions = mention_counts.most_common(10)
    
    return top_mentions

#Se ejecuta la funcion
#q3_memory("./farmers-protest-tweets-2021-2-4.json")

# Resultados

In [47]:
# Resultados de las funciones Q1
q1_time("./farmers-protest-tweets-2021-2-4.json")


[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [48]:
q1_memory("./farmers-protest-tweets-2021-2-4.json")

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria'),
 (datetime.date(2021, 2, 19), 'Preetm91')]

In [49]:
# Resultados de las funciones Q2
q2_time("./farmers-protest-tweets-2021-2-4.json")


[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

In [50]:
q2_memory("./farmers-protest-tweets-2021-2-4.json")

[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

In [51]:
# Resultados de las funciones Q3
q3_time("./farmers-protest-tweets-2021-2-4.json")


[('narendramodi', 2261),
 ('Kisanektamorcha', 1836),
 ('RakeshTikaitBKU', 1639),
 ('PMOIndia', 1422),
 ('RahulGandhi', 1125),
 ('GretaThunberg', 1046),
 ('RaviSinghKA', 1015),
 ('rihanna', 972),
 ('UNHumanRights', 962),
 ('meenaharris', 925)]

In [52]:
q3_memory("./farmers-protest-tweets-2021-2-4.json")

[('narendramodi', 2261),
 ('Kisanektamorcha', 1836),
 ('RakeshTikaitBKU', 1639),
 ('PMOIndia', 1422),
 ('RahulGandhi', 1125),
 ('GretaThunberg', 1046),
 ('RaviSinghKA', 1015),
 ('rihanna', 972),
 ('UNHumanRights', 962),
 ('meenaharris', 925)]

# Posibles mejoras generales orientadas al enfoque de tiempo de ejecucion optimo:

- Tener el archivo .parquet preparado con las columnas necesarias y de este modo restarle carga a la funcion y solo se enfoque a operar sobre el dataframe. Incluso de ser posible tener listo el archivo solo con las columnas necesarias.

Comentario: La solucion con archivo parquet en algunos casos en la primera ejecucion no es la mas rapida pero en futuras ejecuciones es la que hace un mejor uso del tiempo de ejecucion.



# Evaluacion de funciones

In [53]:
import cProfile
import pstats
from memory_profiler import profile

In [57]:
# Segmento para evaluar Q1

# Evaluacion de uso de memoria: 

%load_ext memory_profiler
print("Evaluacion de memoria q1_time")
%memit q1_time("./farmers-protest-tweets-2021-2-4.json")
print("Evaluacion de memoria q1_memory")
%memit q1_memory("./farmers-protest-tweets-2021-2-4.json")

profiler = cProfile.Profile()
profiler.enable()

# Evaluacion de tiempo de ejecucion: 
# Codigo a evaluar:
q1_memory("./farmers-protest-tweets-2021-2-4.json")
q1_time("./farmers-protest-tweets-2021-2-4.json")
profiler.disable()
profiler.dump_stats("output.pstats")
stats = pstats.Stats("output.pstats")
stats.sort_stats("cumulative")
# Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
stats.print_stats(10)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
Evaluacion de memoria q1_time
peak memory: 4171.46 MiB, increment: 136.88 MiB
Evaluacion de memoria q1_memory
peak memory: 4094.05 MiB, increment: 0.14 MiB
Fri Sep 29 16:30:17 2023    output.pstats

         3542602 function calls (3541904 primitive calls) in 9.036 seconds

   Ordered by: cumulative time
   List reduced from 898 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        3    0.000    0.000    9.037    3.012 C:\Users\migue\AppData\Roaming\Python\Python310\site-packages\IPython\core\interactiveshell.py:3472(run_code)
        3    0.000    0.000    9.037    3.012 {built-in method builtins.exec}
        1    1.187    1.187    6.335    6.335 C:\Users\migue\AppData\Local\Temp\ipykernel_7752\1890755831.py:7(q1_memory)
        1    0.148    0.148    2.701    2.701 C:\Users\migue\AppData\Local\Temp\ipykernel_7752\1554105092.py:1(<

<pstats.Stats at 0x2071c943e50>

In [56]:
# Segmento para evaluar Q2

# Evaluacion de uso de memoria: 

%load_ext memory_profiler
print("Evaluacion de memoria q2_time")
%memit q2_time("./farmers-protest-tweets-2021-2-4.json")
print("Evaluacion de memoria q2_memory")
%memit q2_memory("./farmers-protest-tweets-2021-2-4.json")

profiler = cProfile.Profile()
profiler.enable()

# Evaluacion de tiempo de ejecucion: 
# Codigo a evaluar:
q2_memory("./farmers-protest-tweets-2021-2-4.json")
q2_time("./farmers-protest-tweets-2021-2-4.json")
profiler.disable()
profiler.dump_stats("output.pstats")
stats = pstats.Stats("output.pstats")
stats.sort_stats("cumulative")
# Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
stats.print_stats(10)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
Evaluacion de memoria q2_time
peak memory: 4062.60 MiB, increment: 153.31 MiB
Evaluacion de memoria q2_memory
peak memory: 3993.86 MiB, increment: 0.01 MiB
Fri Sep 29 16:29:50 2023    output.pstats

         138721788 function calls (138721776 primitive calls) in 62.744 seconds

   Ordered by: cumulative time
   List reduced from 429 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        3    0.000    0.000   70.437   23.479 C:\Users\migue\AppData\Roaming\Python\Python310\site-packages\IPython\core\interactiveshell.py:3472(run_code)
        3    0.000    0.000   70.437   23.479 {built-in method builtins.exec}
   234814    0.154    0.000   62.800    0.000 c:\Users\migue\AppData\Local\Programs\Python\Python310\lib\site-packages\emoji\core.py:282(emoji_list)
   234814   15.049    0.000   62.647    0.000 c:\Users\migue\AppData\Local\Progr

<pstats.Stats at 0x2071c942410>

In [58]:
# Segmento para evaluar Q3

# Evaluacion de uso de memoria: 

%load_ext memory_profiler
print("Evaluacion de memoria q3_time")
%memit q3_time("./farmers-protest-tweets-2021-2-4.json")
print("Evaluacion de memoria q3_memory")
%memit q3_memory("./farmers-protest-tweets-2021-2-4.json")

profiler = cProfile.Profile()
profiler.enable()

# Evaluacion de tiempo de ejecucion: 
# Codigo a evaluar:
q3_memory("./farmers-protest-tweets-2021-2-4.json")
q3_time("./farmers-protest-tweets-2021-2-4.json")
profiler.disable()
profiler.dump_stats("output.pstats")
stats = pstats.Stats("output.pstats")
stats.sort_stats("cumulative")
# Se muestran resultados ordenados por tiempo de ejecucion, del mas lento al mas rapido
stats.print_stats(10)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
Evaluacion de memoria q3_time
peak memory: 4196.01 MiB, increment: 148.43 MiB
Evaluacion de memoria q3_memory
peak memory: 4065.56 MiB, increment: 0.09 MiB
Fri Sep 29 16:30:49 2023    output.pstats

         1497835 function calls (1497807 primitive calls) in 7.248 seconds

   Ordered by: cumulative time
   List reduced from 541 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        3    0.000    0.000    7.248    2.416 C:\Users\migue\AppData\Roaming\Python\Python310\site-packages\IPython\core\interactiveshell.py:3472(run_code)
        3    0.000    0.000    7.248    2.416 {built-in method builtins.exec}
        1    1.027    1.027    4.720    4.720 C:\Users\migue\AppData\Local\Temp\ipykernel_7752\2932543489.py:5(q3_memory)
        1    0.005    0.005    2.528    2.528 C:\Users\migue\AppData\Local\Temp\ipykernel_7752\955787143.py:1(<m

<pstats.Stats at 0x2071c9421d0>