# Data Engineer Challenge

## BigQuery Table
The table **tweets** was created manually on BigQuery following this steps:
1. Using the function **tweets_json_to_parquet** from the **utils** module to convert the json to a parquet file.
2. Uploading manually the file to a Google Cloud Storage bucket.
3. Create a table manually on the Google Cloud console by poiting to the previous parquet file so the table schema would be "inherited" from the parquet schema.

## Execution

### Defining imports

In [None]:
from collections.abc import Iterable

from src.q1_time import q1_time
from src.q2_memory import q2_memory
from src.q2_time import q2_time
from src.q3_memory import q3_memory
from src.q3_time import q3_time
from src.utils.gcp import download_file
from os.path import exists
import pandas as pd
from src.q1_memory import q1_memory

### Defining the variables used for each function.

In [None]:
file_path = '/tmp/tweets_data.json'
bucket_name = 'latam-de-challenge'
source_blob = 'source-file/farmers-protest-tweets-2021-2-4.json'

project_id = 'latam-de-428219'
dataset = 'challenge_data'
table = 'tweets_tb'

### Checks if the file is already present.

In [None]:
if not exists(file_path):
        download_file(bucket_name, source_blob, file_path)

### Defines a function to print results

In [None]:
def print_result(result: Iterable) -> None:
    for row in result:
        print(row)

### Execute functions bellow

**q1_memory**

This function executes the following steps: 
1. It reads the json file line by line extracting only the columns that are necessary for the code solution ('date', 'user').
2. Converts the 'date' column to datetime64 and extracts only the date part.
3. Applies a lambda function to the 'user' which is of struct type to extract the 'username' field.
4. Counts the number of rows to amount of tweets grouped by day and user.
5. Gets the row for each day where for the user that had the most tweets.
6. Sorts the values by the total_tweets in descending order dropping duplicate dates and filtering the top 10.
7. Iterates over the top 10 tweets and return the result.
8. It catches exceptions for invalid column names, value errors and more broad exceptions.

In [None]:
result = q1_memory(file_path)
print_result(result)

**q2_memory**

This function executes the following steps:
1. Defines the regular expression for capturing emojis. 
2. It reads the json file line by line extracting only the columns that are necessary for the code solution ('content').
3. Extracts the emojis from the 'content' column aggregating them in a string splitted by comma.
4. Splits the string of emojis into a list using the comma as delimiter.
5. Counts the number of each emoji creating 2 new columns in the dataframe ('emoji', 'emoji_count').
6. Sorts the values by the emoji_count in descending order filtering the top 10 most used emojis.
7. Iterates over the top 10 emojis and return the result.
8. It catches exceptions for invalid column names, value errors and more broad exceptions.

In [None]:
result = q2_memory(file_path)
print_result(result)

**q3_memory**

This function executes the following steps:
1. Defines a private function **_get_mentioned_user** to extract the username from tweet mentions.
2. It reads the json file line by line extracting only the columns that are necessary for the code solution ('mentionedUsers').
3. Applies the function **_get_mentioned_user** to the column 'mentionedUsers'.
4. Counts the number of each user creating 2 new columns in the dataframe ('user', 'mentions_count').
5. Sorts the values by the mentions_count in descending order filtering the top 10 most used emojis.
6. Iterates over the top 10 users and return the result.
7. It catches exceptions for invalid column names, value errors and more broad exceptions.

In [None]:
result = q3_memory(file_path)
print_result(result)

**q1_time / q2_time / q3_time**

The 3 functions for execution time follow the same steps describe bellow: 
1. Defines a query to be run on BigQuery
2. Submits the request to BigQuery using the function **query_bigquery** from the utils module
3. Iterates over the results from BigQuery and returns it.

In [None]:
result = q1_time(project_id, dataset, table)
print_result(result)

In [None]:
result = q2_time(project_id, dataset, table)
print_result(result)

In [None]:
result = q3_time(project_id, dataset, table)
print_result(result)

## Improvements

* For more and bigger files Apache Spark would have been a better approach to leverage massive and paralallel processing.
* The JSON file could be converted to a Parquet file to leverage compression and the columnar file format for filter pruning.
* More specific error handling can be implemented depending on the "client's" requirements.
* Adding unit-testing functions with the pytest library
* Use a python packaginng and dependency management like Poetry