# Data Engineer Challenge

In [33]:
import pandas as pd
import json

from pandas import json_normalize
from src.constants import EMOJI_PATTERN

from google.cloud import bigquery
from google.oauth2 import service_account

# Preparaci√≥n de los datos

Debido a que el archivo de origen conten√≠a registros malformados, fue necesario procesar el archivo l√≠nea por l√≠nea, en caso de que el archivo hubiese venido sin malformados se podr√≠a haber leido el archivo utilizando *pd.read_json*. Utilizar el m√©todo *json_normalize* adem√°s de crear el dataframe permite desempaquetar los json que est√°n contenidos dentro de algunos registros.

In [34]:
tweets_path = r'../src/resources/farmers-protest-tweets-2021-2-4.json'

In [35]:
tweets, bad_lines = [], []
with open(tweets_path, 'r') as file:
    for line in file:
        try:
          tweets.append(json.loads(line))
        except:
          bad_lines.append(line)

In [36]:
tweets_flat_df = json_normalize(tweets)

# Desaf√≠o

## Los top 10 tweets m√°s retweeted.
Se utiliza la el m√©todo *nlargest* para obtener los 10 registros con m√°s retweets, utilizando el √≠ndice se filtra el dataframe original para mostrar el contenido de los 10 tweets con mayor n√∫mero de retweets

In [37]:
index_top_ten_tweets = tweets_flat_df['retweetCount'].nlargest(10).index
top_ten_tweets = tweets_flat_df.iloc[index_top_ten_tweets]
top_ten_tweets[['user.username','content','retweetCount']]  

Unnamed: 0,user.username,content,retweetCount
111329,RakeshTikaitBKU,‡§Æ‡§ß‡•ç‡§Ø‡§™‡•ç‡§∞‡§¶‡•á‡§∂ ‡§Æ‡•á‡§Ç ‡§®‡§ø‡§ú‡•Ä ‡§µ‡•ç‡§Ø‡§æ‡§™‡§æ‡§∞‡•Ä 200 ‡§ï‡§∞‡•ã‡§°‡§º ‡§ï‡§æ ‡§ß‡§æ‡§® ...,7723
7645,dhruv_rathee,There's a #FarmersProtest happening in Germany...,6164
89780,rupikaur_,"disha ravi, a 21-year-old climate activist, ha...",4673
88911,amaanbali,Disha Ravi broke down in court room and told j...,3742
111556,jedijasmin_,Farmers are so sweet. Y‚Äôall have to see this @...,3332
64492,rupikaur_,india is targeting young women to silence diss...,3230
108072,RaviSinghKA,Bollywood has betrayed Panjab &amp; the farmer...,3182
60721,sherryontopp,‡§≤‡§π‡§∞‡•ã‡§Ç ‡§ï‡•ã ‡§ñ‡§º‡§æ‡§Æ‡•ã‡§∂ ‡§¶‡•á‡§ñ ‡§ï‡§∞ ‡§Ø‡•á ‡§®‡§æ ‡§∏‡§Æ‡§ù‡§®‡§æ ‡§ï‡§ø ‡§∏‡§Æ‡§Ç‡§¶‡§∞ ‡§Æ‡•á...,3057
29510,sherryontopp,"‡§π‡§æ‡§Å ‡§Æ‡•à‡§Ç ‡§ú‡§æ‡§®‡§§‡§æ ‡§π‡•Ç‡§Å ‡§ï‡§ø ‡§Æ‡•à‡§Ç ‡§∂‡§æ‡§Ø‡§∞ ‡§®‡§π‡•Ä‡§Ç, ‡§î‡§∞ ‡§ú‡§º‡•Å‡§≤‡•ç‡§Æ ...",3040
24160,sherryontopp,"‡§ï‡§≤‡§ø‡§Ø‡•Å‡§ó ‡§π‡•à ‡§∏‡§æ‡§π‡§¨ , ‡§Ø‡§π‡§æ‡§Å ‡§ù‡•Ç‡§†‡•á ‡§ï‡•ã ‡§∏‡•ç‡§µ‡•Ä‡§ï‡§æ‡§∞ ‡§ï‡§ø‡§Ø‡§æ ‡§ú‡§æ...",2622


## Los top 10 users en funci√≥n a la cantidad de tweets que emitieron.
Se agrupan los tweets por usuario y se realiza un conteo por usuario, se toman los 10 valores m√°s grandes.

In [38]:
tweets_flat_df.groupby('user.username')['content'].count().nlargest(10)

user.username
jot__b             1019
rebelpacifist       850
MaanDee08215437     830
Gurpreetd86         636
GurmVicky           597
shells_n_petals     576
preetysaini321      573
ish_kayy            515
KaurDosanjh1979     512
DigitalKisanBot     490
Name: content, dtype: int64

## Los top 10 d√≠as donde hay m√°s tweets.
Se transforma la columna date para manejarla como fecha.  El m√©todo *pd.Grouper* permite especificar que para agrupar los datos de la columna "date" es necesario fijarse √∫nicamente en el d√≠a, ignorando la infomaci√≥n de la hora. Posteriormente se hace un conteo de los tweets y se seleccionan los 10 d√≠as con mayor n√∫mero de tweets.

In [39]:
tweets_flat_df["date"] = pd.to_datetime(tweets_flat_df["date"])
tweets_flat_df.groupby(pd.Grouper(key='date', freq='D'))['content'].count().nlargest(10)

date
2021-02-12 00:00:00+00:00    12347
2021-02-13 00:00:00+00:00    11296
2021-02-17 00:00:00+00:00    11087
2021-02-16 00:00:00+00:00    10443
2021-02-14 00:00:00+00:00    10249
2021-02-18 00:00:00+00:00     9625
2021-02-15 00:00:00+00:00     9197
2021-02-20 00:00:00+00:00     8502
2021-02-23 00:00:00+00:00     8417
2021-02-19 00:00:00+00:00     8204
Name: content, dtype: int64

   ## Los top 10 hashtags m√°s usados.
   Se utiliza una expresi√≥n regular para cachar todos las cadenas que comiencen con # que puedan (o no) terminar en espacio o punto, utiliando el m√©todo *findall* es posible obtener m√°s de una coincidencia (hashtag) por tweet.
   
   El resultado de la primera l√≠nea devuelve un pd.Series donde cada registro contiene una lista. Se utiliza el m√©todo *explode* para convertir cada elemento de la lista en un registro y posteriormente hacer un conteo utilizando el m√©todo *value_counts* que devuelve una objeto pd.Series ordenado de forma ascendente. Se toman solo los 10 primero registros. 

In [40]:
hashtags_in_lst = tweets_flat_df["content"].str.findall(r'(#\w+)[\s|\.]?')
hashtags = hashtags_in_lst.explode()
hashtags.name = 'Hashtag'
hashtags.value_counts()[:10]

Hashtag
#FarmersProtest             112630
#ReleaseDetainedFarmers       5987
#FarmersMakeIndia             5263
#farmersprotest               5105
#MahapanchayatRevolution      4735
#RepealOnlyWayAhead           4511
#IndiaBeingSilenced           4332
#FarmersProtests              3661
#Pagdi_Sambhal_Jatta          3542
#DishaRavi                    3017
Name: count, dtype: int64

## Los top 10 emojis m√°s usados.
Se implementa la misma f√≥rmula de arriba modificando √∫nicamente la expresi√≥n regular utilizada. La expresi√≥n regular EMOJI_PATTERN se importa de un archivo de constantes, esta expresi√≥n utiliza los rangos unicode donde se encuentran los emojis.

In [41]:
emojis_in_lst = tweets_flat_df["content"].str.findall(EMOJI_PATTERN)
emojis = emojis_in_lst.explode()
emojis.name = 'Emojis'
emojis.value_counts()[:10]

Emojis
üôè    7286
üòÇ    3072
Ô∏è    3061
üöú    2972
‚úä    2411
üåæ    2363
üáÆ    2096
üá≥    2094
üèª    2080
‚ù§    1779
Name: count, dtype: int64

## Los top 10 users m√°s influyentes en funci√≥n de lo retweeted de sus tweets.
Se utiliza una funci√≥n de agregaci√≥n para sumar el n√∫mero de retweets por usuario. Se utiliza el m√©todo *nlargest* para seleccionar el top 10.

In [42]:
tweets_flat_df.groupby('user.username')['retweetCount'].sum('retweetCount').nlargest(10)

user.username
amaanbali          26354
saahilmenghani     23288
RaviSinghKA        22974
sherryontopp       19175
RakeshTikaitBKU    12001
rupikaur_          11420
news24tvchannel    10960
iMani_KaurRai      10636
Monica_Gill1        8593
bhupenderc19        7360
Name: retweetCount, dtype: int64

# BigQuery
De forma alternativa podemos usar BigQuery para completar el desafio utilizando SQL para realizar sentencias. Para cargar el dataset es necesario:
* Crear o subir el archivo a un bucket existente
* Crear un dataset en BigQuery
* Crear una tabla en BigQuery importando los datos del bucket permitiendo un m√°ximo de 2 registros malformados
* Crear un service account para conectarnos a BigQuery con los permisos para ejecutar jobs y acceso a la tabla
* Copiar las credenciales para BQ en src/resources/

A continuaci√≥n se muestran algunos de los desaf√≠os resueltos en BQ


In [43]:
credentials = service_account.Credentials.from_service_account_file(
'resources/service_accout_key.json')

project_id = 'richi-latam-project-example-id'
client = bigquery.Client(credentials= credentials,project=project_id)

## Los top 10 tweets m√°s retweeted.

In [44]:
query_1 = client.query(
    """
    SELECT user.username, retweetCount 
    FROM `richi-latam-project-example-id.farmers.farmers-protest-tweets`
    ORDER BY retweetCount DESC
    LIMIT 10
    """)
#Me falt√≥ a√±adir la columna content xD

In [45]:
results = query_1.result()
for row in results:
	print(row[0:2])  

('RakeshTikaitBKU', 7723)
('dhruv_rathee', 6164)
('rupikaur_', 4673)
('amaanbali', 3742)
('jedijasmin_', 3332)
('rupikaur_', 3230)
('RaviSinghKA', 3182)
('sherryontopp', 3057)
('sherryontopp', 3040)
('sherryontopp', 2622)


## Los top 10 users en funci√≥n a la cantidad de tweets que emitieron.

In [46]:
query_2 = client.query(
    """
    SELECT user.username, COUNT(content) AS numTweets
    FROM `richi-latam-project-example-id.farmers.farmers-protest-tweets`
    GROUP BY user.username
    ORDER BY numTweets DESC
    LIMIT 10
    """)

In [47]:
results = query_2.result()
for row in results:
	print(row[0:2])  

('jot__b', 1019)
('rebelpacifist', 850)
('MaanDee08215437', 830)
('Gurpreetd86', 635)
('GurmVicky', 597)
('shells_n_petals', 576)
('preetysaini321', 573)
('ish_kayy', 515)
('KaurDosanjh1979', 512)
('DigitalKisanBot', 490)


## Los top 10 d√≠as donde hay m√°s tweets.

In [48]:
query_3 = client.query(
    """
    SELECT CAST(date AS DATE) AS datePeerDay,  COUNT(content) AS numTweets
    FROM `richi-latam-project-example-id.farmers.farmers-protest-tweets`
    GROUP BY datePeerDay
    ORDER BY numTweets DESC
    LIMIT 10
    """)

In [49]:
results = query_3.result()
for row in results:
	print(row[0:2])  

(datetime.date(2021, 2, 12), 12347)
(datetime.date(2021, 2, 13), 11296)
(datetime.date(2021, 2, 17), 11086)
(datetime.date(2021, 2, 16), 10443)
(datetime.date(2021, 2, 14), 10249)
(datetime.date(2021, 2, 18), 9625)
(datetime.date(2021, 2, 15), 9197)
(datetime.date(2021, 2, 20), 8502)
(datetime.date(2021, 2, 23), 8417)
(datetime.date(2021, 2, 19), 8204)


## Los top 10 users m√°s influyentes en funci√≥n de lo retweeted de sus tweets.

In [50]:
query_4 = client.query(
    """
    SELECT user.username, sum(retweetCount) AS numRetweets
    FROM `richi-latam-project-example-id.farmers.farmers-protest-tweets`
    GROUP BY user.username
    ORDER BY numRetweets DESC
    LIMIT 10
    """)

In [51]:
results = query_4.result()
for row in results:
	print(row[0:2])  

('amaanbali', 26354)
('saahilmenghani', 23288)
('RaviSinghKA', 22974)
('sherryontopp', 19175)
('RakeshTikaitBKU', 12001)
('rupikaur_', 11420)
('news24tvchannel', 10960)
('iMani_KaurRai', 10636)
('Monica_Gill1', 8593)
('bhupenderc19', 7360)
