In [None]:
import pandas as pd
from sqlalchemy import create_engine
import json
from langdetect import detect

import re

In [None]:
# FLAGS
LOCAL = True

LOCAL_CURSOR = 'local_tweet_cursor'
PROD_CURSOR = 'prod_tweet_cursor'

In [None]:
# Create DB connection
engine = create_engine('postgresql+psycopg2://postgres:root@localhost:5432/outbreak') if LOCAL else create_engine('postgresql+psycopg2://postgres:CK2kFnQvBUmMm4fJ84zG@database-1.cssxbzueyuxe.us-east-1.rds.amazonaws.com:5432/postgres')

In [None]:
# Filters

def valid(tweet):
    if tweet['lang'] == 'en':
        return True
    try:
        if tweet['lang'] == 'und':
            return detect(tweet['text']) == 'en'
    except:
#         print("Exception!!")
        if tweet['lang'] == 'und':
#             print('UNDEFINED!!!!!')
            return True
        return False


In [None]:
# Query helpers

USER_FIELDS = ['id', 'name', 'screen_name', 'description', 'followers_count', 'friends_count', 'statues_count']
TWEET_FIELDS = ['id', 'create_at', 'text']

def create_tables():
    with open('Schemas/twitter_schema.sql') as schema:
        query = '\n'.join(schema.readlines())
        engine.execute(query);

def nuke_tables():
    query = "TRUNCATE twitter_users CASCADE; TRUNCATE tweets;"
    engine.execute(query)

def insert_user(user, engine = engine):
    query = "INSERT INTO twitter_users VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
    values = list(user[field] for field in USER_FIELDS)
    values[1] = re.sub('\x00', ' ', user['name'])
    values[2] = re.sub('\x00', ' ', user['screen_name'])
    values[3] = re.sub('\x00', ' ', user['description'])
    engine.execute(query, values)
    
def insert_tweet(tweet, engine = engine):
    query = "INSERT INTO tweets VALUES (%s, %s, %s, %s, %s) ON CONFLICT DO NOTHING"
    values = list(tweet[field] for field in TWEET_FIELDS)
    values[2] = re.sub('\x00', ' ', tweet['text'])
    
    # append user id
    values.append(tweet['user']['id'])
    
    # append country
    if 'geo_tag' in tweet:
        values.append(tweet['geo_tag']['stateName'])
    else:
        values.append('NULL')
        
    engine.execute(query, values)

In [None]:
# nuke_tables()

In [None]:
import itertools
import pickle

start_count = 0
foreign_tweet_count = 0 

CURSOR = LOCAL_CURSOR if LOCAL else PROD_CURSOR

try:
    with open(CURSOR, 'rb') as obj:
        start_count = 28637874
        
except Exception as e:
#     No tweets have been inserted, so start from the begining
    print(e)
    pass


try:
    print("Creating tables...")
    create_tables()
    print("Created tables!")
    with open('Data/zika.json', 'r', encoding='utf-8') as tweets:
        print('Starting at tweet', str(start_count), '.')
        for t in itertools.islice(tweets, start_count, None):
            t_obj = t[1:]
            tweet = json.loads(t_obj, encoding='utf-8')
            if valid(tweet):
                insert_user(tweet['user'])
                insert_tweet(tweet)
            start_count += 1
            if start_count%1000000 == 0:
                print("Total tweets inserted:", str(start_count))
except Exception as e:
    print(e)
finally:
    with open(CURSOR, 'wb') as obj:
            pickle.dump(start_count, obj)
        