## ETL Tweets - Batch load from JSON to Elasticsearch

### Install dependences

### Data load and overview

In [1]:
# required lib imports
import pandas as pd
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from datetime import datetime
import json
import itertools
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="test")

from kafka.producer import KafkaProducer
import time
from kafka.errors import KafkaTimeoutError

In [2]:
# load dataset / preview
df = pd.read_json('tweets.json')
df.head()

Unnamed: 0,created_at,hashtags,favorite_count,retweet_count,text,id,geo,lang
0,2021-03-21 00:55:13,[],55835,7346,COVID outbreak at Mar-a-Lago. Pivate plane fa...,1373392710560673794,,en
1,2021-03-20 17:44:58,[],35367,7222,Aonde foram parar os bilhões de reais enviados...,1373284436054319107,,pt
2,2021-03-20 17:19:12,[],140204,15558,Best wishes to Prime Minister @ImranKhanPTI fo...,1373277949462466560,,en
3,2021-03-21 23:26:47,[],0,0,yeah my second job got so many covid cases i h...,1373732844162408455,,en
4,2021-03-21 23:26:47,[],0,0,WR’s next event is our “Covid Anti-versary How...,1373732843164155907,,en


In [3]:
# general information
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 8 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   created_at      1000 non-null   datetime64[ns]
 1   hashtags        1000 non-null   object        
 2   favorite_count  1000 non-null   int64         
 3   retweet_count   1000 non-null   int64         
 4   text            1000 non-null   object        
 5   id              1000 non-null   int64         
 6   geo             3 non-null      object        
 7   lang            1000 non-null   object        
dtypes: datetime64[ns](1), int64(3), object(4)
memory usage: 62.6+ KB


In [4]:
# show records with geo not null
df.loc[
    df['geo'].notnull()
]

Unnamed: 0,created_at,hashtags,favorite_count,retweet_count,text,id,geo,lang
342,2021-03-20 17:17:46,"[{'text': 'cvspharmtech', 'indices': [112, 125...",0,0,Can you recommend anyone for this job? COVID 1...,1373277590945890306,"{'type': 'Point', 'coordinates': [37.2453843, ...",en
347,2021-03-20 17:17:45,[],0,0,Stop staring at the clock every day. It's time...,1373277588005679104,"{'type': 'Point', 'coordinates': [41.3374374, ...",en
649,2021-03-20 17:16:16,"[{'text': 'pharmacy', 'indices': [89, 98]}, {'...",0,0,"If you're looking for work in Whitehall, OH, c...",1373277212560912385,"{'type': 'Point', 'coordinates': [39.956025, -...",en


In [5]:
# top 10 languages
df['lang'].value_counts().nlargest(10)

en     523
pt     180
es     120
fr      33
in      31
it      30
und     20
tl      16
ja       8
de       5
Name: lang, dtype: int64

### Data Transformation
* created_at must to have ISO format ```(resolved on the bulk insert step)```
* geo is a nested dict with a list with longitude and latitude. It will converted to country ISO cca2 format ```(resolved on the bulk insert step)```
* hashtag is a nested list with dicts. The grain of information is diferent, so it needs to be separeted metric ```(resolved on nexts steps)```

In [6]:
# split Dataframes - with and without geo
df_with_geo = df.copy().loc[
                    df['geo'].notnull()
                ].reset_index()
df_without_geo = df.copy().loc[
                    df['geo'].isnull()
                ].reset_index()

print(df_with_geo.info())
print("------------------")
print(df_without_geo.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   index           3 non-null      int64         
 1   created_at      3 non-null      datetime64[ns]
 2   hashtags        3 non-null      object        
 3   favorite_count  3 non-null      int64         
 4   retweet_count   3 non-null      int64         
 5   text            3 non-null      object        
 6   id              3 non-null      int64         
 7   geo             3 non-null      object        
 8   lang            3 non-null      object        
dtypes: datetime64[ns](1), int64(4), object(4)
memory usage: 344.0+ bytes
None
------------------
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 997 entries, 0 to 996
Data columns (total 9 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   index           9

In [7]:
# preparing the bulk data for index insert
bulk_data_with_geo = df_with_geo.to_dict('records')
bulk_data_without_geo = df_without_geo.to_dict('records')
bulk_data_complete = df.to_dict('records')

In [8]:
# get hashtags list
hastags_list = []
for tweet in bulk_data_complete:
    hastags_list.append([hashtag['text'].lower() for hashtag in tweet['hashtags']])

# flatten hashtags list
hastags_list = list(itertools.chain(*hastags_list))

len(hastags_list)

294

### Elasticsearch bulk insert

In [11]:
# create a elastic instance
es = Elasticsearch()

In [12]:
# bulk insert of tweets_index with geo information
bulk_data = [
        {
            "_index": "tweets_data",
            "_origin": "batch_tweets",
            "_source": {
                'created_at': tweet['created_at'].isoformat(), 
                'id_twitter': tweet['id'],
                'text_tweet': tweet['text'],
                'language': tweet['lang'],
                'hashtags': ", ".join([hashtags['text'] for hashtags in tweet['hashtags']]),
                'favorite_count': tweet['favorite_count'],
                'retweet_count': tweet['retweet_count'],
                #'geo_coordinates': ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates']) ])
                'country': geolocator.reverse(
                                ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates'])])
                            ).raw['address']['country'],
                'country_code': geolocator.reverse(
                                ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates'])])
                            ).raw['address']['country_code']
                
            }
        } for tweet in bulk_data_with_geo
    ]
bulk(es, bulk_data)

(3, [])

In [13]:
# bulk insert of tweets_index without geo information
bulk_data = [
        {
            "_index": "tweets_data",
            "_origin": "batch_tweets",
            "_source": {
                'created_at': tweet['created_at'].isoformat(),
                'id_twitter': tweet['id'],
                'text_tweet': tweet['text'],
                'language': tweet['lang'],
                'hashtags': ", ".join([hashtags['text'] for hashtags in tweet['hashtags']]),
                'favorite_count': tweet['favorite_count'],
                'retweet_count': tweet['retweet_count'],
                #'geo_coordinates': ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates']) ])
                'country': tweet['geo'],
                'country_code': tweet['geo']
            }
        } for tweet in bulk_data_without_geo
    ]
bulk(es, bulk_data)

(997, [])

In [14]:
# bulk insert of hashtags
bulk_data = [
        {
            "_index": "tweets_hashtags",
            "_origin": "batch_tweets",
            "_source": {
                'created_at': datetime.now(),
                'hashtag': hashtag
            }
        } for hashtag in hastags_list
    ]
bulk(es, bulk_data)

(294, [])

## Kafka Ingestion

In [9]:
kafka_data_with_geo = [
        { 
            'created_at': tweet['created_at'].isoformat(), 
            'id_twitter': tweet['id'],
            'text_tweet': tweet['text'],
            'language': tweet['lang'],
            'hashtags': ", ".join([hashtags['text'] for hashtags in tweet['hashtags']]),
            'favorite_count': tweet['favorite_count'],
            'retweet_count': tweet['retweet_count'],
            #'geo_coordinates': ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates']) ])
            'country': geolocator.reverse(
                            ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates'])])
                        ).raw['address']['country'],
            'country_code': geolocator.reverse(
                            ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates'])])
                        ).raw['address']['country_code']
        } for tweet in bulk_data_with_geo
    ]

In [10]:
# bulk insert of tweets_index without geo information
kafka_data_without_geo = [
        {
            'created_at': tweet['created_at'].isoformat(),
            'id_twitter': tweet['id'],
            'text_tweet': tweet['text'],
            'language': tweet['lang'],
            'hashtags': ", ".join([hashtags['text'] for hashtags in tweet['hashtags']]),
            'favorite_count': tweet['favorite_count'],
            'retweet_count': tweet['retweet_count'],
            #'geo_coordinates': ",".join([str(coordinates) for coordinates in (tweet['geo']['coordinates']) ])
            'country': tweet['geo'],
            'country_code': tweet['geo']
        } for tweet in bulk_data_without_geo
    ]

In [15]:
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))

try:
    for tweet in kafka_data_with_geo:
        producer.send('twitter', key=b'covid', value=tweet)
        #print('Sent:', tweet)
        time.sleep(1)
    
    for tweet in kafka_data_without_geo:
        producer.send('twitter', key=b'covid', value=tweet)
        #print('Sent:', tweet)
        time.sleep(1)
    
except KafkaTimeoutError:
    print("Timeout: not possible to send the data.")
finally:
    producer.close()