# DE Challenge - Ismael Umpierrez

## Preprocesamiento

Para ejecutar este archivo puede ser necesario instalar las siguientes dependencias:

In [None]:
%pip install gdown pandas google.cloud.storage google-cloud-bigquery memory_profiler

In [1]:
import gdown
import pandas as pd
import zipfile
from q1_time import q1_time
from q1_memory import q1_memory
from q1_bigquery import q1_time as q1_bigquery
from q2_time import q2_time
from q2_memory import q2_memory

from pandas.io import gbq
from google.cloud import storage, bigquery

Obtengo el archivo desde google drive y lo descomprimo

In [2]:
file_path = "farmers-protest-tweets-2021-2-4.json"
url = f'https://drive.google.com/uc?id=1ig2ngoXFTxP5Pa8muXo02mDTFexZzsis'
extracted_dir = 'working/'

In [2]:
# Descargo de GDrive
gdown.download(url, 'working/tweets.zip', quiet=True)

# Extraigo el contenido del archivo
with zipfile.ZipFile('working/tweets.zip', 'r') as zip_ref:
    zip_ref.extractall(extracted_dir)


Subo los datos del archivo a Google Cloud Storage para poder trabajar con él desde bigquery.

In [4]:
# Variables para conexión a Google Cloud Storage
project_id = "dechallenge"
bucket_name = "dechallenge-tweets"
keyfile_path = "..\creds\dechallenge-51f78ddf0bb6.json"  # JSON key file

storage_client = storage.Client.from_service_account_json(keyfile_path, project=project_id)
bucket = storage_client.bucket(bucket_name)

# Creo un blob con el nombre del archivo
destination_blob_name = file_path
blob = bucket.blob(destination_blob_name)

# Subo el archivo a Google Cloud Storage
blob.upload_from_filename(extracted_dir+file_path)

Subo el archivo que está en un blob de GCS a una tabla de bigquery.

In [5]:
# Crea un cliente de BigQuery
client = bigquery.Client.from_service_account_json(keyfile_path, project=project_id)

# Especifica el ID del proyecto y el ID del nuevo dataset
dataset_id = "tweets"

# Crea el dataset
dataset = bigquery.Dataset(f"{project_id}.{dataset_id}")
dataset_ref = client.dataset(dataset_id, project=project_id)

# Verifica si el dataset ya existe para no volver a crearlo.
if not client.get_dataset(dataset_ref, retry=bigquery.DEFAULT_RETRY):
    # Si el dataset no existe, crea el dataset
    dataset = bigquery.Dataset(dataset_ref)
    client.create_dataset(dataset)
    print(f"Dataset {project_id}.{dataset_id} creado con éxito.")
else:
    print(f"El dataset {project_id}.{dataset_id} ya existe.")

job_config = bigquery.LoadJobConfig(
    autodetect=True,
    write_disposition="WRITE_TRUNCATE",
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
    max_bad_records=10,
)
uri = "gs://dechallenge-tweets/farmers-protest-tweets-2021-2-4.json"

load_job = client.load_table_from_uri(
    uri,
    "dechallenge.tweets.farmers_protest_tweets",
    location="US",
    job_config=job_config,
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table("dechallenge.tweets.farmers_protest_tweets")
print("Loaded {} rows.".format(destination_table.num_rows))

# Ejecuta la consulta de BigQuery

El dataset dechallenge.tweets ya existe.
Loaded 117405 rows.


## Pregunta 1

Primero quiero ver cuanto demora una solución realizada con BigQuery, haciendo uso de las capacidades que nos brinda el procesamiento distribuido en la nube.

In [6]:
resultado = q1_bigquery(client)
# Imprime la lista de tuplas
print(resultado)

[(datetime.date(2021, 2, 12), 'RanbirS00614606'), (datetime.date(2021, 2, 13), 'MaanDee08215437'), (datetime.date(2021, 2, 17), 'RaaJVinderkaur'), (datetime.date(2021, 2, 16), 'jot__b'), (datetime.date(2021, 2, 14), 'rebelpacifist'), (datetime.date(2021, 2, 18), 'neetuanjle_nitu'), (datetime.date(2021, 2, 15), 'jot__b'), (datetime.date(2021, 2, 20), 'MangalJ23056160'), (datetime.date(2021, 2, 23), 'Surrypuria'), (datetime.date(2021, 2, 19), 'Preetm91')]


Como vemos el resultado es muy rápido, pero tenemos que tener en cuenta el overhead de subir los archivos al cloud storage y a una tabla de bigquery.

De todas maneras, para el problema 1 quería utilizar pandas de manera de tener un benchmark contra lo que comparar. Además quiero también ver cual es el uso de memoria de esta solución de "fuerza bruta" para tener una idea del impacto de la solución que optimiza memoria.

In [7]:
resultado = q1_time(extracted_dir+file_path)
print(resultado)

Filename: c:\Users\Caruso\source\Latam-DE-challenge\src\q1_time.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     6    142.0 MiB    142.0 MiB           1   @profile
     7                                         def q1_time(file_path: str) -> List[Tuple[datetime.date, str]]:
     8                                             # Lee el archivo json
     9   1609.9 MiB   1467.8 MiB           1       df_tweets_source = pd.read_json(file_path, lines=True)
    10                                         
    11                                             # Tengo que normalizar el objeto user para poder acceder a sus atributos, en particular me interesa el username.
    12                                             # Además, Transformo la columna 'date' a tipo datetime
    13   1611.7 MiB      7.2 MiB           2       df_tweets = df_tweets_source.assign(
    14   1599.9 MiB    -10.0 MiB           1               userName=pd.json_normalize(df_tweets_source['user']).userna

Se utilizan 1560 MiB de memoria utilizando esta técnica.

En el caso de la optimización de memoria, lo que hago es no cargar el conjunto de datos al mismo tiempo en memoria. En este caso lo cargo linea a linea sacando la información que necesito y manteniendola en un diccionario de dias que contiene un diccionario de tweets por usuario.

In [4]:
resultado = q1_memory(extracted_dir+file_path)
print(resultado)

Filename: c:\Users\Caruso\source\Latam-DE-challenge\src\q1_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     8    148.1 MiB    148.1 MiB           1   @profile
     9                                         def q1_memory(file_path: str) -> List[Tuple[datetime.date, str]]:
    10                                             # Utilizo un diccionario para mantener la cuenta de tweets por usuario por dia. Uso un array para guardar los dias con mas tweets al final.
    11    148.1 MiB      0.0 MiB           1       date_counts = {}
    12    148.1 MiB      0.0 MiB           1       top_dates = []
    13                                         
    14                                             # Leo el archivo json fila a fila.
    15    153.3 MiB     -0.1 MiB           2       with open(file_path, 'r') as jsonfile:
    16    153.4 MiB  -4710.2 MiB      117408           for row in jsonfile:
    17    153.4 MiB  -4735.4 MiB      117407               reader = json.l

Este enfoque utiliza muy poca memoria (153 MiB) pero es realmente lento, una buena mejora para obtener un buen balance entre velocidad y consumo de memoria sería utilizar batches de filas en lugar de ir una a una.
Cabe aclarar también que sin utilizar Memory Profiler el tiempo es mucho menor. Aprox 6 minutos.

## Pregunta 2

Para el caso en donde se optimiza el tiempo de ejecución, decidí concatenar todos los textos de la columna content y buscar en ella los emojis usando una expresión regular. Una vez identificados los emojis, se agrupan, cuentan y ordenan.

In [49]:
resultado = q2_time(extracted_dir+file_path)
print(resultado)


[('🙏', 5049), ('😂', 3072), ('🚜', 2972), ('🌾', 2182), ('🇮🇳', 2086), ('🤣', 1668), ('✊', 1651), ('❤️', 1382), ('🙏🏻', 1317), ('💚', 1040)]


En el caso de la optimización de memoria, sigo un principio parecido al del problema 1, en este caso no voy al extremo de ir fila a fila sino que cargo el dataframe de a 1000 rows, El uso de memoria es muy bajo como se puede ver.

In [3]:
resultado = q2_memory(extracted_dir+file_path)
print(resultado)


Filename: c:\Users\Caruso\source\Latam-DE-challenge\src\q2_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
     6    123.2 MiB    123.2 MiB           1   @profile
     7                                         def q2_memory(file_path: str) -> List[Tuple[str, int]]:
     8    123.2 MiB      0.0 MiB           1       batch_size = 1000
     9    123.8 MiB      0.7 MiB           1       extract_emoji = re.compile(emoji_rx)                   # Match a single emoji
    10                                         
    11                                             # Diccionario para mantener los emojis.
    12    123.8 MiB      0.0 MiB           1       emojis = {}
    13                                         
    14    123.8 MiB      0.0 MiB           1       json_reader = pd.read_json(file_path, lines=True, chunksize=batch_size)
    15    153.0 MiB   -175.1 MiB         119       for chunk in json_reader:
    16    153.0 MiB -227863.7 MiB      117525           for t

## Pregunta 3