# Data Collection

## Contents
-  [Configure Postgres Server with Docker](#Configure-Postgres-Server-with-Docker)
-  [Application Token](#Application-Token)
-  [Collect Tweets](#Collect-Tweets)
-  [Retrieve Data from PostgreSQL Database](#Retrieve-Data-from-PostgreSQL-Database)
-  [Save Data](#Save-Data)

In [None]:
# !pip install python-twitter

In [None]:
# !pip install psycopg2-binary

In [1]:
import twitter
import json
import time
import psycopg2 as pg2
import numpy as np
import pandas as pd
from datetime import datetime
from psycopg2.extras import RealDictCursor, Json

In [2]:
%run ../sql_test.py
%run ../twitter_credentials.py

## Configure Postgres Server with Docker

Define functions to programmatically connect to and insert data into database:
-  **con_cur_to_db**: returns both a connection and a cursor object for database
-  **execute_query**: executes query directly to database, without having to create a cursor and connection each time
-  **insert_entry_json**: inserts data into database

In [3]:
def con_cur_to_db(dbname=DBNAME, dict_cur=None):
    con = pg2.connect(host=IP_ADDRESS,
                  dbname=dbname,
                  user=USER,
                  password=PASSWORD)
    if dict_cur:
        cur = con.cursor(cursor_factory=RealDictCursor)
    else:
        cur = con.cursor()
    return con, cur
    
def execute_query(query, dbname=DBNAME, dict_cur=None, command=False):
    con, cur = con_cur_to_db(dbname, dict_cur)
    cur.execute(f'{query}')
    if not command:
        data = cur.fetchall()
        con.close()
        return data
    con.commit() #sends to server
    con.close() #closes server connection

def insert_entry_json(data, tablename=None):
    con, cur = con_cur_to_db()
    for x in data:
        cur.execute(f'INSERT INTO {tablename} (data) VALUES ({Json(x)});')
    con.commit()
    con.close()

Create table `raw_tweets` to save our collected data into.

In [5]:
query = '''CREATE TABLE raw_tweets
(id SERIAL,
data JSONB);'''

In [None]:
execute_query(query, command=True)

## Application Token

Define API keys and instantiate twitter API

In [7]:
twitter_keys = {
    'consumer_key':        CONSUMER_KEY,
    'consumer_secret':     CONSUMER_SECRET,
    'access_token_key':    ACCESS_TOKEN,
    'access_token_secret': ACCESS_TOKEN_SECRET
}

api = twitter.Api(consumer_key         =   twitter_keys['consumer_key'],
                  consumer_secret      =   twitter_keys['consumer_secret'],
                  access_token_key     =   twitter_keys['access_token_key'],
                  access_token_secret  =   twitter_keys['access_token_secret'],
)

## Collect Tweets

Collect tweets and store into database:
-  `term`: term to search by
-  `geocode`: specify geolocation within which to search for tweets
-  `since`: search for tweets since specified date
-  `count`: number of results returned (100 max)
-  `sql_db`: database to save tweets to

In [8]:
def streamTweets(term, geocode, since, count, sql_db):
    for i in range(1,8):
        year, month, day = since.split('-')
        day = int(day)
        day-=1
        day = str(day).zfill(2)
        date = year + month + day
        after = datetime.strptime(date, '%Y%m%d').strftime('%Y-%m-%d')
        
        results = api.GetSearch(
            term = term,
            geocode = geocode,
            return_json = True
        )

        insert_entry_json(data = results['statuses'], 
                          tablename = sql_db)
        before = after

Define function to have `streamTweets` on a loop to programmatically collect tweets:
-  Repeat function 15 times, returning 100 (`count`) each time
-  Pause for 40 seconds to avoid exceeding rate limit

In [9]:
def tweet_repeater(term, geocode, since, sql_db, repeats=15, count=100):
    for i in range(repeats):
        since = since
        
        streamTweets(term, geocode, since, count, sql_db)
        print(f'Loop {i+1} complete. Raw tweets pushed to {sql_db}.')
        time.sleep(40)
        
    print('All tweets pulled.')

Collect most recent tweets:
-  that contains the term `storm` (terms were determined by natural disasters during time of search)
-  within 15 mile radius of location
-  starting from 2019-01-13
-  run function 100 times, collecting 700 tweets (1 week x 100 tweets) each time
-  save into `raw_tweets` database
-  sample output is displayed 

We searched over two locations and used the following terms for each:

|Location|Latitude|Longitude|Search Terms|Since Date|
|---|---|---|---|---|
|Malibu, CA|34.0249999|-118.773830238|flood, mudslide, landslide, rain, storm|2019-01-06|
|Riverside, CA|33.9806|-117.3755|flood|2019-01-13|

In [10]:
tweet_repeater(term='storm',
               geocode='33.9806,-117.3755,15mi',
               since='2019-01-13',
               repeats=100, 
               count=100,
               sql_db='raw_tweets')

Loop 1 complete. Raw tweets pushed to raw_tweets.
All tweets pulled.


## Retrieve Data from PostgreSQL Database

SELECT * to determine data structure and find information most relevant to us.

In [11]:
query = '''SELECT * FROM raw_tweets;'''
response = execute_query(query, dict_cur=True)
print(type(response))
print(type(response[0]))

<class 'list'>
<class 'psycopg2.extras.RealDictRow'>


### Text (Tweets)

Our data is stored as a list of nested dictionaries. We want to retrieve the text itself (`text`), nested under `data` and put it in a dataframe (`df_text`)

In [12]:
query = """SELECT data ->> 'text'
FROM raw_tweets;
"""
response = execute_query(query, dict_cur=True)
df_text = pd.DataFrame(response)

### Geo-Coordinates
We then want to retrieve the geo coordinates for each tweet in order to map their location and allocate resources there. This is stored in dataframe `df_geo`.

In [13]:
query = """SELECT data#>'{place,bounding_box,coordinates}'
FROM raw_tweets;
"""
response = execute_query(query, dict_cur=True)
df_geo = pd.DataFrame(response).dropna()
df_geo.head()

Unnamed: 0,?column?
3,"[[[-118.794237, 34.125821], [-118.715023, 34.1..."
15,"[[[-118.668404, 33.704538], [-118.155409, 33.7..."
26,"[[[-118.794237, 34.125821], [-118.715023, 34.1..."
34,"[[[-118.794237, 34.125821], [-118.715023, 34.1..."
43,"[[[-118.668404, 33.704538], [-118.155409, 33.7..."


Users that do not have location enabled will return `NaN`, so we'll drop these.

In [14]:
df_geo.dropna(inplace = True)

The bounding box for the geo-fence is in a nested list. We key into the list and take the average left/right latitude and upper/lower longitude to approximate the location of a given tweet. These values are stored in the `lat` and `long` column, respectively.

In [15]:
latitude = []
longitude = []

for tweet in df_geo['?column?']:
    inside = tweet[0][1]
    outside = tweet[0][3]
    lat = (inside[0] + outside[0])/2
    long = (inside[1] + outside[1])/2
    latitude.append(lat)
    longitude.append(long)

We then drop the nested list stored in `?column?`.

In [16]:
df_geo.drop(columns=['?column?'], inplace=True)

### All Data

We merge our text and geo-coordinates data into one dataframe, `df`.

In [17]:
df = pd.merge(df_text, df_geo, left_index=True, right_index=True)

Every twitter query returns a random subset of tweets containing the specified search term. We drop duplicates to ensure every tweet is unique.

In [18]:
df.drop_duplicates(keep='first', inplace=True)

## Save Data

In [19]:
df.to_csv('../data/raw_tweets')