## Latam Challenge David Molina


Este cuaderno tiene como propósito abordar un challenge planteado mediante dos enfoques: uno que optimiza el uso de la memoria y otro que maximiza la eficiencia en el tiempo de ejecución.

La base de datos a analizar consiste en un archivo de texto plano con registros en formato JSON separados por saltos de línea. Con un tamaño aproximado de 400 MB, este archivo no representa un desafío significativo para su análisis en una computadora personal, ya que la mayoría de los equipos actuales cuentan con suficiente memoria para manejarlo sin inconvenientes.

Por lo antes mencionado, utilizaré mi computadora personal para la experimentación de este challenge, así como también evitar un uso innecesario de recursos. No obstante, en escenarios donde los archivos sean considerablemente más grandes, sería altamente recomendable emplear herramientas diseñadas para el procesamiento distribuido, como un clúster de Spark con DataProc en GCP o Amazon Glue en AWS. Estas soluciones permiten distribuir automáticamente los datos entre varios nodos en memoria o en disco, un principio que también se aplicará en este challenge, aunque utilizando archivos en una máquina local, como explicaré más adelante.

### Optimización

Para el análisis de datos, utilizaremos la biblioteca pandas, que nos permitirá realizar agrupaciones y aplicar las transformaciones necesarias. Durante las pruebas de ejecución, se identificó que el principal cuello de botella en términos de tiempo y uso de memoria ocurría en la lectura y procesamiento del archivo JSON. Por esta razón, la optimización del código se ha centrado en mejorar tanto el rendimiento como el consumo de memoria en esta etapa crítica.

A continuación, se detallan las librerías que se utilizarán en el desarrollo. Exceptuando pandas y emoji, todas las demás son librerías estándar que vienen incluidas en Python por defecto.

Versión de Python: 3.10.6
Paquetes:

pandas==2.2.3
emoji==2.14.1

In [5]:
# Importamos librerías
from datetime import datetime
from typing import List, Tuple
import emoji
import os
import pandas as pd
import json

#Definimos el nombre de nuestro archivo JSON
file_path = "farmers-protest-tweets-2021-2-4.json"

# Definimos los comandos magic para medir tiempo y memoria
%load_ext memory_profiler
%load_ext line_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler
The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


### 1. Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene

Se determinó que el principal factor limitante es el tiempo requerido para leer y transformar los datos. No obstante, dado que no es necesario procesar la totalidad de los campos para resolver nuestro problema, se decidió focalizar el análisis únicamente en aquellos elementos imprescindibles. Para lograr un ahorro considerable en el tiempo de ejecución, se optó por cargar el archivo en su forma original y evitar el uso del método de lectura JSON de pandas, lo que permitió optimizar el rendimiento del proceso.

In [10]:
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:

    # Abrimos el archivo y leemos todas las líneas
    with open(file_path, 'r') as archivo:
        lineas = archivo.readlines()
    
    registros = []
    # Extraemos la información relevante de cada línea
    for entrada in lineas:
        dato = json.loads(entrada)
        # Convertimos la fecha (se toman los primeros 10 caracteres) a objeto date
        dia = datetime.strptime(dato["date"][:10], "%Y-%m-%d").date()
        nombre_usuario = dato["user"]["username"]
        id_tweet = dato["id"]
        registros.append({"date": dia, "user": nombre_usuario, "id": id_tweet})
    
    # Convertimos la lista de diccionarios en un DataFrame
    df = pd.DataFrame(registros)
    
    # Agrupamos por fecha para contar los tweets y extraemos los 10 días con más actividad
    resumen_dias = df.groupby("date").count().sort_values("id", ascending=False).head(10)
    dias_destacados = list(resumen_dias.index)
    
    # Filtramos el DataFrame para incluir sólo los registros de los 10 días (top)
    df_filtrado = df[df["date"].isin(dias_destacados)]
    
    # Agrupamos por fecha y usuario para contar publicaciones, y seleccionar el usuario con mayor número de tweets en cada día
    df_agrupado = df_filtrado.groupby(["date", "user"]).count().reset_index()
    df_agrupado = df_agrupado.sort_values(["date", "id"], ascending=False)
    df_resultado = df_agrupado.groupby("date").first()
    
    # Convertimos el DataFrame resultante en una lista de tuplas (fecha, usuario)
    salida = [ (registro.Index, registro.user) for registro in df_resultado[["user"]].itertuples() ]
    return salida


In [76]:
# Ejemplo de ejecución:
print(q1_time(file_path))

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 19), 'Preetm91'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria')]


In [11]:
def q1_memory(file_path: str) -> list:

    # Diccionario para contar cuántos registros (tweets) hay por fecha.
    particiones = {}
    
    # Creamos un directorio temporal para almacenar los archivos particionados por fecha.
    dir_tmp = "tmp_q1"
    os.makedirs(dir_tmp, exist_ok=True)
    
    with open(file_path, 'r') as archivo_principal:
        # Procesamos cada línea individualmente
        for linea in archivo_principal:
            registro = json.loads(linea)
            fecha = registro.get("date")[:10]
            usuario = registro.get("user").get("username")
            id_tweet = registro.get("id")
            datos = {"usuario": usuario, "id": id_tweet}
            
            # Generamos la ruta del archivo temporal para la fecha actual.
            ruta_archivo_temp = os.path.join(dir_tmp, fecha)
            with open(ruta_archivo_temp, "a") as archivo_temp:
                archivo_temp.write(json.dumps(datos) + "\n")
            
            # Actualizamos el contador de registros para la fecha correspondiente.
            particiones[fecha] = particiones.get(fecha, 0) + 1
    
    # Convertimos el diccionario de particiones en un DataFrame para facilitar el ordenamiento.
    df_particiones = pd.DataFrame([{"fecha": f, "registros": c} for f, c in particiones.items()])
    # Seleccionamos los 10 días con mayor número de tweets.
    df_top10 = df_particiones.sort_values("registros", ascending=False).head(10)
    fechas_top = list(df_top10["fecha"])
    
    # Listamos para almacenar el resultado final: (fecha, usuario con más tweets ese día)
    salida = []
    for fecha in fechas_top:
        ruta_temp = os.path.join(dir_tmp, fecha)
        df_temp = pd.read_json(ruta_temp, lines=True)
        
        # Agrupamos por el campo 'usuario' y contar el número de tweets de cada uno.
        # Ordenamos de forma descendente para identificar al usuario con mayor tweets.
        df_usuario = df_temp.groupby("usuario").count().sort_values("id", ascending=False)
        usuario_max = df_usuario.index[0]
        
        salida.append((fecha, usuario_max))
    
    # Limpiamos el directorio temporal eliminando todos los archivos generados.
    for nombre_archivo in os.listdir(dir_tmp):
        ruta_temp = os.path.join(dir_tmp, nombre_archivo)
        if os.path.isfile(ruta_temp):
            os.remove(ruta_temp)
    
    return salida


In [12]:
# Ejemplo de ejecución:
print(q1_memory(file_path))

[('2021-02-12', 'RanbirS00614606'), ('2021-02-13', 'MaanDee08215437'), ('2021-02-17', 'RaaJVinderkaur'), ('2021-02-16', 'jot__b'), ('2021-02-14', 'rebelpacifist'), ('2021-02-18', 'neetuanjle_nitu'), ('2021-02-15', 'jot__b'), ('2021-02-20', 'MangalJ23056160'), ('2021-02-23', 'Surrypuria'), ('2021-02-19', 'Preetm91')]


In [13]:
%timeit q1_time(file_path)


2.69 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
%memit q1_time(file_path)

peak memory: 579.79 MiB, increment: 38.62 MiB


In [15]:
%timeit q1_memory(file_path)
%memit q1_memory(file_path)

4.34 s ± 265 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
peak memory: 547.50 MiB, increment: 0.00 MiB


| Función   | Tiempo   | Memoria   |
|-----------|---------|-----------|
| q1_time   | 2.69s   | 579.79 MiB |
| q1_memory | 4.34s | 547.50 MiB  |


En este ejercicio, se implementó el algoritmo de dividir y vencer para optimizar el uso de memoria durante la lectura del archivo. Es decir, en lugar de cargar el archivo completo en memoria, se procedió a fragmentarlo en partes más pequeñas, permitiendo que cada segmento se procese de forma independiente. Esta estrategia facilitó el manejo de grandes volúmenes de datos al evitar cuellos de botella y posibilitar un procesamiento más eficiente, similar a las técnicas utilizadas en entornos de computación distribuida.

Para este caso, y aplicable también a las funciones q2 y q3 de optimización de memoria, se optó por gestionar el procesamiento mediante archivos que actuaron como unidades independientes. En otras palabras, con la finalidad de generar un resumen diario del número de tweets por usuario, se leyó el archivo original de forma secuencial, dividiéndolo en distintos archivos separados por fecha, sobre los cuales se efectuaron los cálculos.

### 2. Los top 10 emojis más usados con su respectivo conteo:
Para el análisis de emojis, aplicaremos una estrategia similar: limitaremos la lectura del archivo únicamente al campo relevante, en este caso, content. A partir de este campo, extraeremos los emojis utilizando la función analyze de la biblioteca emoji. Los resultados se almacenarán en una lista, que luego consolidaremos en una estructura global para todo el archivo. Finalmente, organizaremos los datos y realizaremos el conteo utilizando un DataFrame.

In [18]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:

    # Abrimos el archivo y leer todas las líneas
    with open(file_path, 'r') as archivo:
        lineas = archivo.readlines()
    
    # Recopilamos el contenido de cada registro
    textos = []
    for linea in lineas:
        registro = json.loads(linea)
        textos.append(registro.get("content"))
    
    # Extraemos los emojis de cada texto y acumularlos en una lista
    todos_emojis = []
    for texto in textos:
        # Asumimos que emoji.analyze devuelve objetos con el atributo 'chars'
        todos_emojis.extend([item.chars for item in emoji.analyze(texto)])
    
    # Creamos un DataFrame con cada emoji y asignamos un contador inicial de 1
    df_emojis = pd.DataFrame({"emoji": todos_emojis})
    df_emojis["contador"] = 1
    
    # Agrupamos por emoji, sumamos los contadores y ordenamos de mayor a menor
    df_top = df_emojis.groupby("emoji").sum().sort_values("contador", ascending=False).head(10)
    
    # Reiniciamos índice para facilitar la conversión a lista de tuplas
    df_resultado = df_top.reset_index()
    # Convertimos a una lista de tuplas (emoji, cantidad)
    salida = [(fila["emoji"], fila["contador"]) for _, fila in df_resultado.iterrows()]
    
    return salida


In [19]:
# Ejemplo de ejecución:
print(q2_time(file_path))

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


In [20]:
import os
import json
import pandas as pd
import emoji

def q2_memory(file_path: str) -> list:

    # Definimos el directorio temporal donde se guardará el archivo con los emojis extraídos.
    dir_aux = "tmp_q2"
    os.makedirs(dir_aux, exist_ok=True)  # Crea el directorio si no existe.

    ruta_aux = os.path.join(dir_aux, "emojis.txt")
    
    # Abrir el archivo de entrada y el archivo temporal para escritura.
    with open(file_path, 'r') as arch_in, open(ruta_aux, "a") as arch_out:
        for linea in arch_in:
            dato = json.loads(linea)
            # Extraer el contenido del campo 'content', si existe.
            texto = dato.get("content", "")
            # Obtenemos una lista de los emojis presentes en el texto.
            lista_emojis = [item["emoji"] for item in emoji.emoji_list(texto)]
            # Si hay emojis en la línea, escribimos en el archivo temporal.
            if lista_emojis:
                arch_out.write("\n".join(lista_emojis) + "\n")
    
    conteo_emojis = {}
    
    # Leemos el archivo temporal y contamos los emojis.
    with open(ruta_aux, "r") as aux:
        for linea in aux:
            car = linea.strip()  # Eliminamos espacios en blanco y saltos de línea.
            if car:  
                conteo_emojis[car] = conteo_emojis.get(car, 0) + 1  

    # Convertimos el diccionario de conteo a un DataFrame para facilitar el ordenamiento.
    df_emo = pd.DataFrame({"emoji": list(conteo_emojis.keys()),
                           "cuenta": list(conteo_emojis.values())})

    # Ordenamos los emojis por frecuencia de uso en orden descendente y tomar los 10 más usados.
    df_top = df_emo.sort_values("cuenta", ascending=False).head(10)

    # Convertimos el DataFrame en una lista de tuplas (emoji, conteo).
    salida = [tuple(x) for x in df_top.set_index("emoji").itertuples()]

    # Eliminamos el archivo temporal después de procesar los datos.
    os.remove(ruta_aux)

    return salida


In [22]:
print(q2_memory(file_path))

[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


Para llevar a cabo la ejecución con optimización de memoria, utilizamos archivos locales como paso previo en el procesamiento. Extraeremos los emojis de nuestra fuente de datos de manera secuencial, línea por línea, y procederemos a almacenarlos en un archivo local para luego agrupar.

In [23]:
%timeit q2_time(file_path)
%memit q2_time(file_path)

9.94 s ± 193 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
peak memory: 571.87 MiB, increment: 6.12 MiB


In [33]:
%timeit q2_memory(file_path)
%memit q2_memory(file_path)

9.37 s ± 225 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
peak memory: 567.81 MiB, increment: 0.00 MiB


| Función   | Tiempo   | Memoria   |
|-----------|---------|-----------|
| q2_time   | 9.94s   | 571.87 MiB |
| q2_memory | 9.06s | 567.81 MiB  |

### 3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.
En este problema, se tomará de igual manera los campos estrictamente necesarios, en este caso mentionedUsers el cual contiene un listado de usernames

In [38]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    # Abrimos el archivo y leer todas las líneas
    with open(file_path, 'r') as f:
        registros = f.readlines()
    
    # Lista para almacenar cada mención encontrada
    lista_menciones = []
    
    # Recorremos cada registro y extraer las menciones
    for linea in registros:
        datos = json.loads(linea)
        usuarios = datos.get("mentionedUsers")
        if usuarios:
            # Agregamos cada nombre de usuario mencionado a la lista
            for usuario in usuarios:
                lista_menciones.append(usuario["username"])
    
    # Creamos un DataFrame con las menciones para agrupar y sumar los conteos
    df_menciones = pd.DataFrame({"usuario": lista_menciones})
    df_menciones["conteo"] = 1
    # Agrupamos por 'usuario' y sumar los contadores
    resumen = df_menciones.groupby("usuario").sum()
    # Ordenamos de mayor a menor cantidad y obtener los 10 primeros
    top_10 = resumen.sort_values("conteo", ascending=False).head(10)
    
    # Convertimos el DataFrame a una lista de tuplas (username, conteo)
    salida = [(fila.Index, fila.conteo) for fila in top_10.itertuples()]
    
    return salida


In [37]:
# Ejemplo de ejecución:
print(q3_time(file_path))

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


In [45]:
def q3_memory(file_path: str) -> list:
    
    menciones = []  # Lista para almacenar los nombres de usuarios mencionados.
    # Abrimos el archivo JSON y procesamos línea por línea.
    with open(file_path, "r") as arch:
        for linea in arch:
            dato = json.loads(linea)
            usuarios_mencionados = dato.get("mentionedUsers")

            if usuarios_mencionados:
                # Extraemos los nombres de usuario de las menciones y agregarlos a la lista.
                menciones.extend([u["username"] for u in usuarios_mencionados])
    
    # DataFrame con la lista de menciones.
    df_menc = pd.DataFrame({"usuario": menciones})
    # Agregamos una columna auxiliar para el conteo de menciones.
    df_menc["contador"] = 1
    # Agrupamos por usuario y para contar cuántas veces aparece cada uno.
    df_agrup = df_menc.groupby("usuario").sum()
    # Ordenamos los usuarios por el número de menciones en orden descendente y tomar el top 10.
    df_top = df_agrup.sort_values("contador", ascending=False).head(10)
    # Convertimos el DataFrame en una lista de tuplas (usuario, cantidad de menciones).
    salida = [tuple(x) for x in df_top.itertuples()]

    return salida


In [41]:
# Ejemplo de ejecución:
print(q3_memory(file_path))

[('narendramodi', 2265), ('Kisanektamorcha', 1840), ('RakeshTikaitBKU', 1644), ('PMOIndia', 1427), ('RahulGandhi', 1146), ('GretaThunberg', 1048), ('RaviSinghKA', 1019), ('rihanna', 986), ('UNHumanRights', 962), ('meenaharris', 926)]


In [43]:
%timeit q3_time(file_path)
%memit q3_time(file_path)

2.1 s ± 65.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
peak memory: 567.81 MiB, increment: 0.00 MiB


In [44]:
%timeit q3_memory(file_path)
%memit q3_memory(file_path)

2.12 s ± 98.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
peak memory: 567.81 MiB, increment: 0.00 MiB


| Función   | Tiempo   | Memoria   |
|-----------|---------|-----------|
| q3_time   | 2.1s   | 567.81 MiB |
| q3_memory | 2.12s | 567.81 MiB  |

### Conclusiones
Hemos logrado optimizar el proceso de ejecución al identificar el cuello de botella en la lectura y el parseo de la información, lo que nos permitió abordar este paso crucial y ajustar la optimización según el objetivo deseado.

Si bien la implementación se realizó en una máquina única, se aplicaron principios fundamentales de la computación distribuida, como el particionamiento y el procesamiento en lotes, que sientan las bases para una escalabilidad futura.

Si bien en ciertos ejercicios la diferencia no es tan grande, podemos notar que esto radica especificamente en la maquina local y la memoria asignada, así como también el tamaño del archivo.  

Para procesar archivos de mayor tamaño, es recomendable utilizar clusters de procesamiento, como Spark, que permiten distribuir los datos de manera óptima. Además, nuestros códigos pueden adaptarse fácilmente para trabajar con DataFrames en PySpark.

