# Latam Challenge Diego Sisalima

El siguiente cuaderno tiene como objetivo resolver un challenge propuesto realizando a través de un enfoque que optimice la memoria y otro enfoque optimizando el tiempo de ejecución. 

## Background

La base de datos a analizar consta de una base en texto plano con registros json separados por salto de linea, el tamaño actual del archivo de aproximadamente 400Mb no presenta un riesgo al tratar de ser analizado en una computadora personal ya que actualmente usualmente estas cuentan con memoria suficiente para realizar el análisis en este tipo de archivos.

Siendo estrictamente pragmaticos, haré uso de mi computador personal para la experimentación del challenge por las razones mencionadas y para ahorrar el gasto de recursos innesarios (para no matar moscas a escopetazos como se suele decir), debo sin embargo aclarar que de ser otro escenario donde los tamaños de los archivos son consirablemente mayores es altamente recomendable tratarlos con otro tipo de herramientas como las que se disponen en la nube como por ejemplo un cluster de Spark con DataProc en GCP o Amazon Glue en AWS que nos permiten distribuir de manera automatica los datos a través de varios nodos en memoria o en disco, mismo principio que se aplica en este challenge pero a través de archivos como explicaré mas adelante dentro de la misma maquina local.

## Desarrollo

En análisis de datos lo haremos usando la libreria pandas que no permitirá hacer agrupaciones y las transformaciones necesarias. Al realizar pruebas de ejecución se pudo verficar que el cuello de botella en cuanto a tiempo y memoria se concentraba principalmente en la lectura y parseamiento del archivo json, por esta razón me he concentrado en atacar este cuello de botella al optimizar el codigo en tiempo y en memoria

Las librerías a usar se las describe en el siguiente bloque de codigo, a excepción de la librería pandas y emoji, el resto de librerías son estandasip  que vienen en el paquete de Python por default.

Version de Python: 3.9.12 <br>
pandas==1.5.3<br>
emoji==2.8.0 <br>

Especial mención a la librería typing para la validación de tipo de datos que tambien viene por defecto en las versiones de Python

In [1]:
# Importamos librerías
from typing import List, Tuple
from datetime import datetime
import emoji
import pandas as pd
import json
import os

In [2]:
#Definimos el nombre de nuestro archivo y de comandos magic para medir tiempo y memoria
file_path = "farmers-protest-tweets-2021-2-4.json"
%load_ext memory_profiler
%load_ext line_profiler

## 1. Las top 10 fechas donde hay más tweets. Mencionar el usuario (username) que más publicaciones tiene
por cada uno de esos días. Debe incluir las siguientes funciones:

Como se ha podido notar, el cuello de botella principal es el tiempo de lectura y parseamiento de datos, sin embargo no es necesario parsear todos los campos para solucionar nuestro problema por lo que para ahorrar tiempo de ejecución haremos solo un parseo de los campos necesarios leyendo en archivo en bruto y evitando usar el comando de lectura de pandas para json.

In [7]:
def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
    # Lectura del archivo y obteniendo los datos que necesitamos
    file1 = open(file_path, 'r')
    Lines = file1.readlines()
    data = []

    # Cargamos en memoria las lineas del archivo y vamos analizando linea a linea los campos que necesitamos
    for line in Lines:
        json_value = json.loads(line)
        user = json_value.get("user").get("username")
        date = datetime.strptime(json_value.get("date")[:10], "%Y-%m-%d").date()
        data_id = json_value.get("id")
        data.append({"date":date, "user":user, "id":data_id})
    
    tweets_data = pd.DataFrame(data)
    
    # Aqui vamos a agrupar para obtener el top 10 days
    top_10_days = tweets_data.groupby(["date"]).count()
    top_10_days = top_10_days.sort_values("id", ascending=False).head(10)
    top_10_days = list(top_10_days.index)
    
    # Filtramos los datos para el top 10 de dias
    tweets_data = tweets_data.loc[tweets_data["date"].isin(top_10_days)]

    # Agrupamos por el user y tomamos el de mayor numero de tweets por dia
    tweets_data = tweets_data.groupby(["date","user"]).count()
    tweets_data = tweets_data.sort_values(["date","id"], ascending=False)
    tweets_data = tweets_data.reset_index().groupby("date").first()
    
    # Transformamos en lista de tuplas para cumplir el tipo de dato de salida
    tweets_data = [tuple(i) for i in tweets_data[["user"]].itertuples()]
    return tweets_data

In [9]:
#Respuesta de la Q1 optimizando tiempo
q1_time(file_path)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'),
 (datetime.date(2021, 2, 13), 'MaanDee08215437'),
 (datetime.date(2021, 2, 14), 'rebelpacifist'),
 (datetime.date(2021, 2, 15), 'jot__b'),
 (datetime.date(2021, 2, 16), 'jot__b'),
 (datetime.date(2021, 2, 17), 'RaaJVinderkaur'),
 (datetime.date(2021, 2, 18), 'neetuanjle_nitu'),
 (datetime.date(2021, 2, 19), 'Preetm91'),
 (datetime.date(2021, 2, 20), 'MangalJ23056160'),
 (datetime.date(2021, 2, 23), 'Surrypuria')]

In [8]:
%timeit q1_time(file_path)

18.2 s ± 3.76 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%memit q1_time(file_path)

peak memory: 1449.68 MiB, increment: 0.00 MiB


Observamos como para nuestro q1 se obtuvo un tiempo de ejecución promedio de 18.2 segundos y un uso de memoria de alrededor de 1449.68 MB

En el escenario de optimización de memoria utilizaremos el algoritmo de dividir y vencer. En el caso de la computación distruida como en clusteres spark como se mencionó antes se puede controlar esto a través del particionamiento de la inforación, esto permitirá a que cada nodo maneje un rango de datos mas pequeño, sin embargo para esto en fundamental el escalamiento horizontal, que en otras palabras significa agregar mas nodos al cluster de procesamiento.

En este caso, y que de igual forma aplicaremos a las funciones q2 y q3 de optimización de memoria, será trabajar con archivos, que nos servirán a manera de "nodos" para hacer un procesamiento previo. Dicho de otra manera, ya que queremos obtener un resumen por dia del numero de tweets y por usuario, leeremos linea a linea el archivo de origin y lo iremos distribuyendo en varios archivos, uno por cada fecha a partir de los cuales calcularemos los valores deseados.

In [3]:
def q1_memory(file_path):
    # Se leerá el archivo de datos origin y linea por linea se distibuye a un archivo de acuerdo a su fecha
    # A su vez, contaremos el numero de lineas que tiene cada archivo para asi obterner el numero de tweets por cada dia
    file_lines = {}

    # Hacemos la lectura del archivo linea a linea
    with open(file_path, 'r') as file:
        for line in file:
            json_value = json.loads(line)
            date = json_value.get("date")[:10]
            user = json_value.get("user").get("username")
            data_id = json_value.get("id")
            data = {"user":user, "id":data_id}
            with open(f"data_q1/{date}","a") as fwrite:
                fwrite.write(json.dumps(data)+"\n") # Guardamos el registro en su archivo correspondiente
                try:
                    file_lines[date] += 1 # Contamos el numero de lineas en un diccionario por dia
                except:
                    file_lines[date] = 0 # Si no existe se empieza a contar
    
    # Convertimos en dataframe a nuestro diccionario y obtenemos el top 10 de dias con mas tweets
    top_10_days = pd.DataFrame([{"date":i, "rows":file_lines[i]} for i in file_lines])
    top_10_days = top_10_days.sort_values("rows", ascending = False)[:10]
    top_10_days = list(top_10_days['date'])
    
    # Tendremos asi particionados nuestros datos, leeremos ahora si con pandas read json y agrupamos por usuario para tomar el primero
    result_list = []
    # Notese que solo leemos los archivos de los top 10 de días
    for i in top_10_days:
        data_tmp = pd.read_json(f'data_q1/{i}', orient='records', lines=True)
        data_tmp = data_tmp.groupby("user").count().sort_values("id", ascending = False)[:1]
        user = data_tmp.index.values[0]
        result_list.append((i, user))
    
    # Borramos los archivos auxiliares generados para alguna proxima ejecución
    directory_path = 'data_q1'
    file_list = os.listdir(directory_path)
    for file_name in file_list:
        file_path = os.path.join(directory_path, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)

    return result_list


In [4]:
q1_memory(file_path)

[('2021-02-12', 'RanbirS00614606'),
 ('2021-02-13', 'MaanDee08215437'),
 ('2021-02-17', 'RaaJVinderkaur'),
 ('2021-02-16', 'jot__b'),
 ('2021-02-14', 'rebelpacifist'),
 ('2021-02-18', 'neetuanjle_nitu'),
 ('2021-02-15', 'jot__b'),
 ('2021-02-20', 'MangalJ23056160'),
 ('2021-02-23', 'Surrypuria'),
 ('2021-02-19', 'Preetm91')]

In [6]:
%timeit q1_memory(file_path)

1min 3s ± 11.6 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
%memit q1_memory(file_path)

peak memory: 155.22 MiB, increment: 2.24 MiB


Obtenidos una vez los datos comprobamos como la ejecución q1_memory hizo un descenco significado

| Funcion | Tiempo | Memoria |
|---------|--------|---------|
| q1_time | 18.2s | 1449.68 Mib |
| q1_memory | 1min 3s | 155.22 MiB |


## 2. Los top 10 emojis más usados con su respectivo conteo:

De igual manera para el conteo de emojis usaremos un enfoque similar, leeremos solos los campos necesarios en la lectura del archivo, en este caso el campo content, de cual obtendremos los emojis con la ayuda de la función analiza de la libreria emoji. Extraemos en una lista y la concatenamos en una lista global para todos el archivo para finalmente agruparlos usando un dataframe

In [3]:
def q2_time(file_path: str) -> List[Tuple[str, int]]:
    # En este caso haremos la lectura igual que en la primera pregunta pero extraeremos unicamente el campo content
    file_data = open(file_path, 'r')
    Lines = file_data.readlines()
    data = []

    for line in Lines:
        json_value = json.loads(line)
        data.append(json_value.get("content"))

    # Lista donde iremos concatenando los emojis de todo el archivo
    emoji_values = []
    
    for i in data:
        # Emoji analize retorna un tipo de dato con el valor del emoji y su posición, en este caso solo tomaremos el valor
        emoji_values += [value.chars for value in emoji.analyze(i)]

    #Vamos a crear un dataframe con los resultados
    data = pd.DataFrame({"emoji":emoji_values})
    data["counter"] = 1
    data = data.groupby('emoji').sum().sort_values("counter", ascending = False).head(10)
    emoji_list = [tuple(i) for i in data[["counter"]].itertuples()]
    file_data.close() #Liberamos memoria
    
    return emoji_list

In [4]:
q2_time(file_path)

[('🙏', 5049),
 ('😂', 3072),
 ('🚜', 2972),
 ('🌾', 2182),
 ('🇮🇳', 2086),
 ('🤣', 1668),
 ('✊', 1651),
 ('❤️', 1382),
 ('🙏🏻', 1317),
 ('💚', 1040)]

In [9]:
%timeit q2_time(file_path)

34.5 s ± 2.83 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
%memit resultado_q2_time = q2_time(file_path)

peak memory: 597.83 MiB, increment: 16.88 MiB


Para la ejecución con optimización de memoria nos apoyaremos de archivos en local para hacer un paso intermedio en el procesamiento, extraermos linea a linea los emojis de nuestra fuente de datos y los guardaremos en un archivo en local.

Para el agrupamiento iremos leyendo linea a linea el archivo de preprocesamiento y los iremos agrupando en un diccionario de datos el cual procesaremos con pandas para obetener la respuesta deseada.

In [7]:
def q2_memory(file_path: str) -> List[Tuple[str, int]]:
    # Para evitar consumir la memoria en la lectura del archivo iremos leyendo linea a linea el archivo
    # haremos una cuenta un agrupamiento previo e iremos almacenando en un archivo

    # Lectura linea a linea
    file1 = open(file_path,"r")
    fwrite = open("aux_mem_q2/emoji_data","a")
    file_read = open("aux_mem_q2/emoji_data","r")

    for line in file1:
        json_value = json.loads(line)
        # Leeremos solo el valor del content donde estan los emojis
        content = json_value.get("content")
        
        # Extraemos emojis y los agrupamos
        tmp_emoji_list = [value.chars for value in emoji.analyze(content)]
        tmp_emoji_list = '\n'.join(tmp_emoji_list)
        
        # Si existen emojis en la linea los guardaremos en el archivo previo
        if tmp_emoji_list:
            fwrite.write(tmp_emoji_list+"\n")

    file1.close()
    fwrite.close()
        
    # Ahora leeremos el archivo fila a fila y almacenaremos en un dictionario de datos sumando uno por cada ocurrencia
    emoji_values = {}

    for emoji_line in file_read:
        if emoji_line.replace('\n','') in emoji_values.keys():
            emoji_values[emoji_line.replace('\n','')] +=1 # Si existe se suma uno
        else:
            emoji_values[emoji_line.replace('\n','')] = 0 # Si no existe lo crea
    
    #Lo hacemos dataframe para agrupar y sacar los maximos
    emoji_values = pd.DataFrame({"emoji":list(emoji_values.keys()),"conteo":list(emoji_values.values())})
    emoji_values = emoji_values.sort_values('conteo', ascending=False).head(10)

    # #Lo volvemos listado de tuplas
    emoji_values = [tuple(i) for i in emoji_values.set_index('emoji').itertuples()]

    # Borrar el archivo generado
    os.remove('aux_mem_q2/emoji_data')
    file1.close()
    
    return emoji_values

In [8]:
q2_memory(file_path)

[('🙏', 5048),
 ('😂', 3071),
 ('🚜', 2971),
 ('🌾', 2181),
 ('🇮🇳', 2085),
 ('🤣', 1667),
 ('✊', 1650),
 ('❤️', 1381),
 ('🙏🏻', 1316),
 ('💚', 1039)]

In [12]:
%timeit q2_memory(file_path)

40.3 s ± 7.45 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [29]:
%memit q2_memory(file_path)

peak memory: 158.62 MiB, increment: 0.00 MiB


De igual manera si hacemos una tabla comparativa podemos observar de mejor manera como efectivamente se aplico la optmización de memoria y tiempo respectivamente


| Funcion | Tiempo | Memoria |
|---------|--------|---------|
| q2_time | 34.5s | 597.83.68 Mib |
| q2_memory | 40.3s | 158.62 MiB |

## 3. El top 10 histórico de usuarios (username) más influyentes en función del conteo de las menciones (@) que registra cada uno de ellos.

En este problema, se tomará de igual manera los campos estrictamente necesarios, en este caso mentionedUsers el cual contiene un listado de usernames 

In [13]:
def q3_time(file_path: str) -> List[Tuple[str, int]]:
    # De igual manera se hara una lectura del archivo linea a linea para tomar los campos estrictamente necesarios
    # La documentacion no lo especifica pero en un analsis del archivo se puede observar que existe el campo mentionedUsers del cual se tomara los usernames
    file1 = open(file_path, 'r')
    Lines = file1.readlines()
    list_mentioned_user = []
    
    for line in Lines:
        json_value = json.loads(line)
        mentioned_users = json_value.get("mentionedUsers")
        if mentioned_users:
            list_mentioned_user += [i["username"] for i in mentioned_users] #Se tomara las n veces que se mecione en el tweet, se puede tomar unicos transformando a set y luego a list, pero en este caso lo mantedre así
    
    # Transformamos a dataframe para ordenar ocurrencias
    mentioned_users = pd.DataFrame({"mentioned_user":list_mentioned_user})
    mentioned_users["conteo"] = 1
    mentioned_users = mentioned_users.groupby("mentioned_user").sum()
    mentioned_users = mentioned_users.sort_values("conteo", ascending = False).head(10)

    #Transformamos a tuplas
    mentioned_users = [tuple(i) for i in mentioned_users.itertuples()]
    
    return mentioned_users

In [14]:
q3_time(file_path)

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

In [16]:
%timeit q3_time(file_path)

6.73 s ± 1.13 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [15]:
%memit q3_time(file_path)

peak memory: 581.56 MiB, increment: 0.00 MiB


Para la optimización de memoria de crearemos una lista donde se iran almancenando los valores linea a linea de las menciones de los tweets

In [18]:
def q3_memory(file_path: str) -> List[Tuple[str, int]]:
    # De igual manera se hara una lectura del archivo linea a linea para tomar los campos estrictamente necesarios
    # Se toma el mismo campo mentionedUsers del cual se tomara los usernames
    file1 = open(file_path, 'r')
    list_mentioned_user = []
    
    #La lectura es linea a linea para no saturar memoria
    for line in file1:
        json_value = json.loads(line)
        mentioned_users = json_value.get("mentionedUsers")
        if mentioned_users:
            list_mentioned_user += [i["username"] for i in mentioned_users] #Se tomara las n veces que se mecione en el tweet, se puede tomar unicos transformando a set y luego a list, pero en este caso lo mantedre así
    
    # Transformamos a dataframe para ordenar ocurrencias
    mentioned_users = pd.DataFrame({"mentioned_user":list_mentioned_user})
    mentioned_users["conteo"] = 1
    mentioned_users = mentioned_users.groupby("mentioned_user").sum()
    mentioned_users = mentioned_users.sort_values("conteo", ascending = False).head(10)

    #Transformamos a tuplas
    mentioned_users = [tuple(i) for i in mentioned_users.itertuples()]

    file1.close()
    return mentioned_users

In [19]:
q3_memory(file_path)

[('narendramodi', 2265),
 ('Kisanektamorcha', 1840),
 ('RakeshTikaitBKU', 1644),
 ('PMOIndia', 1427),
 ('RahulGandhi', 1146),
 ('GretaThunberg', 1048),
 ('RaviSinghKA', 1019),
 ('rihanna', 986),
 ('UNHumanRights', 962),
 ('meenaharris', 926)]

In [50]:
%timeit q3_memory(file_path)

11.1 s ± 960 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
%memit q3_memory(file_path)

peak memory: 150.88 MiB, increment: 6.93 MiB


Tabla comparativa de tiempo de ejecución y uso de memoria por cada función


| Funcion | Tiempo | Memoria |
|---------|--------|---------|
| q3_time | 6.73 | 581.56.68 Mib |
| q3_memory | 11.1s  | 150.88 MiB |

## Conclusiones

Se ha logrado hacer una optimización de las ejecuciones, al indentificar el cuello de botella que es la lectura y parseo de la información hemos podido atacar este paso en las ejecuciones dependiendo del tipo de optimización que se deseaba.

Aunque nuestra implementación es una para una maquina unica, hemos aplicado algunos principios básicos que se hacen uso en la computación distribuida como es el particionamiento y el procesamiento en lotes.

En caso de requerir procesar archivos mas grandes debemos hacer uso de cluster de procesamiento como Spark para distribuir de manera optima los datos, nuestros codigos pueden ser facilmente replanteados para DataFrames PySpark o usando Spark SQL para el procesamiento

Me gustaría realizar una observación para el challenge, el esquema de la data presenta actualmente se encuentra desactualizada con respecto a la documentación oficial por lo que recomiendo la actualización de la data del challenge a uno con un esquema actual.