# Data ETL

Basic data extraction, transformation and loading into a SQLite database for further preprocessing.

- [Tweet object metadata](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object)
- [User object metadata](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/user-object)

In [1]:
import os
from os.path import join
from sqlite3 import connect
import pandas as pd
import numpy as np

project_dir = join(os.getcwd(), os.pardir)
raw_dir = join(project_dir, 'data', 'raw')
interim_dir = join(project_dir, 'data', 'interim')

In [2]:
raw_fname = 'SAMPLE_DATA_QUERY_((cyclone amphan) OR amphan)_FROMDATE_2020-05-14_TODATE_2020-06-15.json'
df = pd.read_json(join(raw_dir, raw_fname), lines=True)

_____
### Exploring and Transforming the contents in the raw data

In [3]:
df.columns

Index(['created_at', 'id', 'id_str', 'text', 'source', 'truncated',
       'in_reply_to_status_id', 'in_reply_to_status_id_str',
       'in_reply_to_user_id', 'in_reply_to_user_id_str',
       'in_reply_to_screen_name', 'user', 'geo', 'coordinates', 'place',
       'contributors', 'retweeted_status', 'is_quote_status', 'quote_count',
       'reply_count', 'retweet_count', 'favorite_count', 'entities',
       'favorited', 'retweeted', 'filter_level', 'lang', 'matching_rules',
       'display_text_range', 'extended_tweet', 'possibly_sensitive',
       'extended_entities', 'quoted_status_id', 'quoted_status_id_str',
       'quoted_status', 'quoted_status_permalink'],
      dtype='object')

In [4]:
df.isna().sum(0) / df.shape[0] # checks percentage of missing values in existing columns

created_at                   0.000000
id                           0.000000
id_str                       0.000000
text                         0.000000
source                       0.000000
truncated                    0.000000
in_reply_to_status_id        0.954667
in_reply_to_status_id_str    0.954667
in_reply_to_user_id          0.943333
in_reply_to_user_id_str      0.943333
in_reply_to_screen_name      0.943333
user                         0.000000
geo                          0.998667
coordinates                  0.998667
place                        0.986000
contributors                 1.000000
retweeted_status             0.240000
is_quote_status              0.000000
quote_count                  0.000000
reply_count                  0.000000
retweet_count                0.000000
favorite_count               0.000000
entities                     0.000000
favorited                    0.000000
retweeted                    0.000000
filter_level                 0.000000
lang        

### Important information regarding each feature (Tweet objects): 
- "id" and "id_str" values differ, use "id_str"
- "text" does not contain the full text of the tweet (ends with "...")
- "source" can possibly be used to detect device type (android/ios/pc/mac/ipad...)
- "truncated" indicates whether the text parameter was truncated. It goes with the "extended_tweet" parameter
- "extended_tweet" contains the full tweet if "truncated" is True
- "in_reply_to_status_id" ignore. Use "in_reply_to_status_id_str" instead
- "in_reply_to_user_id" ignore. Use "in_reply_to_user_id_str" instead
- "user" contains information about the user posting the tweet. It is subdivided into many other fields (user-related)
- "coordinates" is nearly always empty. It should probably be disregarded.
- "place" is slightly more common than "coordinates", although still very rare. Probably should be disregarded.
- "quoted_status_id" ignore. Use "quoted_status_id_str" instead. Although this field can probably be dropped.
- "retweeted_status" indicates whether it's a retweet. Must be kept.
- "entities" contains info of Twitter's own text parsing
- "favorited" should be dropped
- "retweeted" should be dropped
- "filter_level" can be dropped
- "lang" indicates a BCP 47 language identifier corresponding to the machine-detected language of the Tweet text
- "matching_rules" should be dropped
- "geo" should be dropped. Use the "coordinates" field instead
- "retweeted_status", retweets can be distinguished from typical Tweets by the existence of a retweeted_status attribute. This attribute contains a representation of the original Tweet that was retweeted.

### Important information regarding each feature (User objects): 
- "id" and "id_str" values differ, use "id_str"
- "name" is the name of the user, as they’ve defined it. Not necessarily a person’s name.
- "screen_name" is the handle, or alias that this user identifies themselves with. screen_names are unique but subject to change. Use id_str as a user identifier whenever possible.
- "location" user-defined location of the user. Nearly 40% missing values.
- "derived" contains its own object. Collection of Enrichment metadata derived for user. Mostly related with user location. Inexistent in the sample dataset.
- "url" user's associated website. Can be dropped (76% missing values).
- "description" is the user-defined UTF-8 string describing their account. Can be used for feature extraction.
- "protected" when true, indicates that this user has chosen to protect their Tweets. Confirm all Users have False. Can dropped afterwards.
- "verified" indicates that the user has a verified account. 
- "friends_count" number of users this account is following.
- "listed_count" number of public lists that this user is a member of.
- "statuses_count" number of Tweets (including retweets) issued by the user.
- "default_profile" can be dropped.
- "profile_background_tile" can be dropped. (deprecated)
- Many features contain 100% missing values, mostly due to feature deprecation. Should be dropped.

In [5]:
df_users = pd.DataFrame(df['user'].tolist())

# I am deleting "entities" and "extended_entities" because I was unsure
# of its advantage. Maybe it can be properly preprocessed, or at maybe
# we can just take the useful bits out of it.
df_tweets = df.drop(columns=[
    'id', 'in_reply_to_status_id', 'in_reply_to_user_id', 'user',
    'coordinates', 'place', 'quoted_status_id', 'favorited',
    'retweeted', 'retweeted_status', 'matching_rules', 'geo', 
    'filter_level', 'display_text_range', 'contributors',
    'quoted_status', 'quoted_status_id', 'quoted_status_permalink',
    'in_reply_to_screen_name', 'text', 'extended_tweet', 'truncated',
    'entities', 'extended_entities'
])
df_tweets['user_id_str'] = df['user'].apply(lambda x: x['id_str'])
df_tweets['full_text'] = df.apply(
    lambda row: 
        row['text'] 
        if not row['truncated'] 
        else row['extended_tweet']['full_text'], 
    axis=1
)

def get_retweet_id(row):
    """returns: is_retweet, original_tweet_id_str"""
    if type(row['retweeted_status']) == dict:
        return True, row['retweeted_status']['id_str']
    else:
        return False, np.nan

df_tweets['is_retweet'], df_tweets['original_tweet_id_str'] = zip(*df.apply(get_retweet_id, axis=1))
df_tweets['is_reply'] = ~df['in_reply_to_status_id'].isna()

In [6]:
df_users.isna().sum(0) / df_users.shape[0]

id                                    0.000000
id_str                                0.000000
name                                  0.000000
screen_name                           0.000000
location                              0.371333
url                                   0.755667
description                           0.188000
translator_type                       0.000000
protected                             0.000000
verified                              0.000000
followers_count                       0.000000
friends_count                         0.000000
listed_count                          0.000000
favourites_count                      0.000000
statuses_count                        0.000000
created_at                            0.000000
utc_offset                            1.000000
time_zone                             1.000000
geo_enabled                           0.000000
lang                                  1.000000
contributors_enabled                  0.000000
is_translator

In [7]:
print('Sanity check: dropping duplicate rows VS dropping duplicate user IDs')
print(df_users.drop_duplicates().shape)
print(df_users.drop_duplicates(subset='id_str').shape)

# After some research, I found this inconsistency comes from the features 'favourites_count' and 'statuses_count'
# Two users probably had different figures when (re)tweeting posts that match the original search query

Sanity check: dropping duplicate rows VS dropping duplicate user IDs
(2432, 39)
(2430, 39)


In [8]:
# drop columns with 100% missing values
all_missing = df_users.columns[(df_users.isna().sum(0) / df_users.shape[0]) == 1].tolist()

df_users = df_users.drop(columns=[
    'id', 'url', 'protected', 'default_profile', 'profile_image_url', 
    'profile_image_url_https', 'profile_banner_url', 'profile_background_image_url',
    'profile_background_image_url_https', 'profile_background_tile', 'profile_link_color',
    'profile_sidebar_fill_color', 'profile_text_color', 'profile_use_background_image',
    'default_profile_image', 'translator_type', 'contributors_enabled', 'is_translator',
    'profile_background_color', 'profile_sidebar_border_color'
]+all_missing).drop_duplicates(subset='id_str', keep='first')

_____
### Visualizing transformed data

In [9]:
df_users.head()

Unnamed: 0,id_str,name,screen_name,location,description,verified,followers_count,friends_count,listed_count,favourites_count,statuses_count,created_at,geo_enabled
0,901828520,JillurRahaman(Bapi),Bapigharami,"Hasnabad, Taki","Secretary,District YouthCongressN24PGS(Rural)....",False,39,358,0,1990,1854,Wed Oct 24 13:28:37 +0000 2012,False
1,921596113949417472,NORTH 24 PGS DYC(Rural),GharamiBapi,"Basirhat,Hasnabad, Taki","Secretary,District youth congress N24PGS.Ex Pr...",False,47,274,0,2287,1989,Sat Oct 21 04:36:50 +0000 2017,False
2,1256085380106842113,Thiên Nhi,ThienNhi0211,,Waanjai MewGulf ❤❤❤\nHậu Cung MewGulf ❤Waanjai...,False,88,623,0,1197,10753,Fri May 01 04:58:01 +0000 2020,False
3,1245207861828837379,Arvind Xalxo,ArvindXalxo16,,i am a farmer,False,7,83,0,22,76,Wed Apr 01 04:34:34 +0000 2020,False
4,631712939,Kiku Phoenix Princes,kikuphoenix,,support : @Msuppasit \n#MewSuppasit \n#Mewlions,False,42,34,0,33788,52672,Tue Jul 10 05:37:42 +0000 2012,False


In [10]:
df_tweets.head()

Unnamed: 0,created_at,id_str,source,in_reply_to_status_id_str,in_reply_to_user_id_str,is_quote_status,quote_count,reply_count,retweet_count,favorite_count,lang,possibly_sensitive,quoted_status_id_str,user_id_str,full_text,is_retweet,original_tweet_id_str,is_reply
0,2020-06-14 23:51:02+00:00,1272315644495581184,"<a href=""http://twitter.com/download/android"" ...",,,False,0,0,0,0,en,,,901828520,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
1,2020-06-14 23:50:39+00:00,1272315545027637248,"<a href=""http://twitter.com/download/android"" ...",,,False,0,0,0,0,en,,,921596113949417472,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
2,2020-06-14 23:43:52+00:00,1272313840907112448,"<a href=""http://twitter.com/download/android"" ...",,,False,0,0,0,0,en,,,1256085380106842113,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False
3,2020-06-14 23:42:44+00:00,1272313554926858240,"<a href=""http://twitter.com/download/android"" ...",,,False,0,0,0,0,en,,,1245207861828837379,RT @SanjoyLohar16: THEY NEED HELP !!!\n\nFor m...,True,1270897138445111296,False
4,2020-06-14 23:41:23+00:00,1272313215536357376,"<a href=""http://twitter.com/download/android"" ...",,,False,0,0,0,0,en,,,631712939,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False


In [11]:
df_merged = pd.merge(df_users,df_tweets,left_on="id_str",right_on="user_id_str")

In [12]:
df_merged = df_merged.rename(columns={
    'id_str_x': 'user_id',
    'id_str_y': 'tweet_id',
    'created_at_x': 'user_created_at',
    'created_at_y': 'tweet_created_at',
})
date_format = '%a %b %d %H:%M:%S %z %Y'
df_merged['user_created_at'] = pd.to_datetime(df_merged['user_created_at'], format=date_format, errors='ignore')
# df_merged['tweet_created_at'] = pd.to_datetime('tweet_created_at', errors='coerce')
df_merged = df_merged.where(df_merged.notnull(), None)

df_merged.columns
df_merged.dtypes


user_id                                   object
name                                      object
screen_name                               object
location                                  object
description                               object
verified                                    bool
followers_count                            int64
friends_count                              int64
listed_count                               int64
favourites_count                           int64
statuses_count                             int64
user_created_at              datetime64[ns, UTC]
geo_enabled                                 bool
tweet_created_at             datetime64[ns, UTC]
tweet_id                                   int64
source                                    object
in_reply_to_status_id_str                 object
in_reply_to_user_id_str                   object
is_quote_status                             bool
quote_count                                int64
reply_count         

In [13]:
df_merged.head()

Unnamed: 0,user_id,name,screen_name,location,description,verified,followers_count,friends_count,listed_count,favourites_count,...,retweet_count,favorite_count,lang,possibly_sensitive,quoted_status_id_str,user_id_str,full_text,is_retweet,original_tweet_id_str,is_reply
0,901828520,JillurRahaman(Bapi),Bapigharami,"Hasnabad, Taki","Secretary,District YouthCongressN24PGS(Rural)....",False,39,358,0,1990,...,0,0,en,,,901828520,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
1,921596113949417472,NORTH 24 PGS DYC(Rural),GharamiBapi,"Basirhat,Hasnabad, Taki","Secretary,District youth congress N24PGS.Ex Pr...",False,47,274,0,2287,...,0,0,en,,,921596113949417472,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
2,1256085380106842113,Thiên Nhi,ThienNhi0211,,Waanjai MewGulf ❤❤❤\nHậu Cung MewGulf ❤Waanjai...,False,88,623,0,1197,...,0,0,en,,,1256085380106842113,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False
3,1245207861828837379,Arvind Xalxo,ArvindXalxo16,,i am a farmer,False,7,83,0,22,...,0,0,en,,,1245207861828837379,RT @SanjoyLohar16: THEY NEED HELP !!!\n\nFor m...,True,1270897138445111296,False
4,631712939,Kiku Phoenix Princes,kikuphoenix,,support : @Msuppasit \n#MewSuppasit \n#Mewlions,False,42,34,0,33788,...,0,0,en,,,631712939,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False


_____
### Loading data into database

In [14]:
import json
from pprint import pprint

def rec_to_actions(df):
    for record in df.to_dict(orient="records"):
        yield ('{ "index" : { "_index" : twitter}}')
        yield (record)

In [15]:
from elasticsearch import Elasticsearch,helpers
es = Elasticsearch(['192.168.99.100','localhost']) # no args, connect to localhost:9200
# print(es.indices.get_alias("*"))
# try:
#     r = es.bulk(rec_to_actions(df_merged)) # return a dict
# except Exception as e:
#     print(e)

# i = 0
# for record in df_merged.to_dict(orient="records"):
#     try:
#         res = es.index(index="twitter",id=record['tweet_id'], body=record)
        
#     except Exception as e:
#         i += 1
#         print(e)
#         pprint(record)
        
# print(i)
# print(r["errors"])

In [16]:
from src.data._load_es importload_es
from src.data_transform import merge_dataframes 

df_merge = merge_dataframes(df_users,df_tweets)
df_merge.head()

Unnamed: 0,user_id,name,screen_name,location,description,verified,followers_count,friends_count,listed_count,favourites_count,...,retweet_count,favorite_count,lang,possibly_sensitive,quoted_status_id_str,user_id_str,full_text,is_retweet,original_tweet_id_str,is_reply
0,901828520,JillurRahaman(Bapi),Bapigharami,"Hasnabad, Taki","Secretary,District YouthCongressN24PGS(Rural)....",False,39,358,0,1990,...,0,0,en,,,901828520,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
1,921596113949417472,NORTH 24 PGS DYC(Rural),GharamiBapi,"Basirhat,Hasnabad, Taki","Secretary,District youth congress N24PGS.Ex Pr...",False,47,274,0,2287,...,0,0,en,,,921596113949417472,RT @IYCWestBengal: Today #WBPYC President @Sha...,True,1272197734913544193,False
2,1256085380106842113,Thiên Nhi,ThienNhi0211,,Waanjai MewGulf ❤❤❤\nHậu Cung MewGulf ❤Waanjai...,False,88,623,0,1197,...,0,0,en,,,1256085380106842113,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False
3,1245207861828837379,Arvind Xalxo,ArvindXalxo16,,i am a farmer,False,7,83,0,22,...,0,0,en,,,1245207861828837379,RT @SanjoyLohar16: THEY NEED HELP !!!\n\nFor m...,True,1270897138445111296,False
4,631712939,Kiku Phoenix Princes,kikuphoenix,,support : @Msuppasit \n#MewSuppasit \n#Mewlions,False,42,34,0,33788,...,0,0,en,,,631712939,RT @indianmewlions: We are happy to announce t...,True,1268786300645228544,False


In [17]:
load_es(df_merge)

2020-07-27 23:54:00,508 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.109s]
2020-07-27 23:54:00,685 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.087s]
2020-07-27 23:54:00,853 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.088s]
2020-07-27 23:54:01,020 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.086s]
2020-07-27 23:54:01,204 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.101s]
2020-07-27 23:54:01,382 - elasticsearch - INFO - POST http://192.168.99.100:9200/twitter/_bulk [status:200 request:0.099s]


3000 []
