In [2]:
import json
import pymongo
import psycopg2
import sqlalchemy
import pandas as pd
from sqlalchemy import create_engine, text
from time import strptime, strftime

In [3]:
engine = create_engine('postgresql://postgres:root@localhost:5432/postgres')
con = engine.connect()

In [4]:
client = pymongo.MongoClient("mongodb://localhost:27017")
db = client["twitter"]

In [5]:
mongodb_columns = ['id', 'created_at', 'text', 'source', 'quote_count', 'reply_count', 
                   'retweet_count', 'favorite_count', 'entities']

postgres_columns = ['id', 'name', 'screen_name', 'location', 'url', 'description', 'protected', 'verified', 'followers_count', 
           'friends_count', 'listed_count', 'favourites_count', 'statuses_count', 'created_at'] 

In [6]:
def filter_func(item):
    key, value = item
    
    if key in mongodb_columns:
        return True
    else:
        return False

In [7]:
def cleaning(data):
        
        doc = dict(filter(filter_func, data.items()))
        doc['user_id'] = data['user']['id']      
        doc['_id'] = doc['id']
        
        #formating the date field
        format =  "%a %b %d %H:%M:%S %z %Y"
        obj = strptime(doc['created_at'], format)
        doc['created_at'] = strftime('%Y-%m-%d %H:%M:%S', obj)
        
        #extracting hashtags
        hashtag_list = []
        for i in doc['entities']['hashtags']:
                hashtag_list.append(i['text'])
        doc['hashtags'] = hashtag_list  
        
        #Extracting user mentions
        user_mentions = []
        for i in doc['entities']['user_mentions']:
            user_mentions.append(i['screen_name'])
        doc['user_mentions'] = user_mentions
        
        del doc['id']
        del doc['entities']
        return doc

In [8]:
def load_to_postgres(data):
        df = pd.DataFrame([data])
        df = df.filter(postgres_columns)
        df['created_at'] = pd.to_datetime(df['created_at'])
        try:
            df.to_sql(name = 'users', if_exists='append', con = engine, index = False)
            
        except sqlalchemy.exc.IntegrityError:
            pass     

In [9]:
def load_data(file_name):
    with open(file_name, "r") as f1:
        for line in f1:
            try:
                data = json.loads(line) 
            except ValueError:
                continue

            #loading data into postgresql
            load_to_postgres(data['user'])

            try:
                load_to_postgres(data['retweeted_status']['user'])
            except KeyError:
                pass

            # loading data into mongodb
            doc = cleaning(data)

            if 'retweeted_status' in data:
                doc['retweeted_status_id'] = data['retweeted_status']['id']
                retweet = cleaning(data['retweeted_status'])

                try:
                    db.tweets.insert_one(retweet)
                except pymongo.errors.DuplicateKeyError:
                    pass

            try:            
                db.tweets.insert_one(doc)
            except pymongo.errors.DuplicateKeyError:
                continue           

In [10]:
load_data("corona-out-3")