GOAL: Implement a data pipeline, using your programming language of choice, that outputs a machine-learning ready dataset.

Steps: <br>
Connect to the Twitter streaming API and do a keyword search on Justin Beiber <br>
Filter out all tweets having to do with music <br>
Store the tweets into a database of your choosing <br>
Avoid duplicates<br>
Produce a count of all tweets consumed<br>
Produce a count of unique tweets<br>

### Import Statements

In [1]:
import requests
import os
import json
import pandas as pd

### Generate tokens from Twitter Developer API

In [2]:
#Enter bearer token derived from twitter Developer API
os.environ['TOKEN'] = 'bearer-token'

In [3]:
def auth():
    '''
    This method retrieves the bearer token from the environment
    '''
    return os.getenv('TOKEN')

In [4]:
def create_headers(bearer_token):
    '''
    This methos takes the bearer token and passes it 
    for authorization. It returns headers that can be
    used to access APIs
    '''
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers

### Create URL and connect to endpoint

In [5]:
def create_url(keyword, max_results):
    '''
    This method builds the request for the endpoint to be used,
    and parameters to be passed based on the endpoint
    '''
    
    #note: Since I have Essential access to Twitter API, I'm using recent search instead of full archive search
    #refer for url access levels: https://developer.twitter.com/en/docs/twitter-api/tweets/search/introduction
    #refer for details on Endpoint URL: https://developer.twitter.com/en/docs/twitter-api/tweets/search/api-reference/get-tweets-search-recent
    search_url = "https://api.twitter.com/2/tweets/search/recent" #The recent search endpoint returns Tweets from the last seven days that match a search query.

    query_params = {'query': keyword,
                    'max_results': max_results,
                    'expansions': 'author_id,in_reply_to_user_id,geo.place_id',
                    'tweet.fields': 'id,text,author_id,lang',
                    'next_token': {}}
    return (search_url, query_params)

In [6]:
def connect_to_endpoint(url, headers, params, next_token = None):
    '''
    This method uses the url, headers and parameters defined above
    to connect to the end point
    '''
    params['next_token'] = next_token   #params object received from create_url function
    response = requests.request("GET", url, headers = headers, params = params)
    print("Endpoint Response Code: " + str(response.status_code))
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

In [7]:
#Inputs for the request
bearer_token = auth()
headers = create_headers(bearer_token)
keyword = "(Justin Bieber)" #keyword search on Justin Bieber
max_results = 100 #max results limit is 10-100 for API search results with current access level

### Append results to csv 

This step attempts to extract more than 100 results from the Twitter API using next_token in json response. it keeps on appending new results to the CSV file until next_token become None.

In [8]:
import csv
# Create file
csvFile = open("data.csv", "a", newline="", encoding='utf-8')
csvWriter = csv.writer(csvFile)
csvWriter.writerow(['author id', 'tweet_id','lang', 'text'])
csvFile.close()

In [9]:
def append_to_csv(json_response, fileName):
    '''
    This method loops through each JSON response
    and appends data to CSV file
    '''
    counter = 0
    csvFile = open(fileName, "a", newline="", encoding='utf-8')
    csvWriter = csv.writer(csvFile)

    for tweet in json_response['data']:
        author_id = tweet['author_id']
        tweet_id = tweet['id']
        lang = tweet['lang']
        text = tweet['text']
        res = [author_id, tweet_id, lang, text]       
        # Append the result to the CSV file
        csvWriter.writerow(res)  
    csvFile.close() 

### Generate more responses based on next_token

To retieve all tweets for the query , we have to keep sending requests using the new next_token, until no next_token exists

In [10]:
import time
url = create_url(keyword, max_results)
#The response return from the Twitter API is in JSON format
next_token=None
flag = True
counter=0
while flag:
    json_response = connect_to_endpoint(url[0], headers, url[1],next_token)
    result_count=json_response['meta']['result_count']
    #to limit number of next_tokens generated , I've used counter<10, just to ensure I don't my quota for tweets pulled
    if 'next_token' in json_response['meta'] and counter<20:
        next_token=json_response['meta']['next_token']
        counter+=1
        if result_count is not None and result_count > 0 and next_token is not None:
            append_to_csv(json_response, "data.csv")
            
            #time.sleep() is added between calls to ensure to not spam the API with requests.
            time.sleep(5) 
        
    else:
        if result_count is not None and result_count > 0:
            append_to_csv(json_response, "data.csv")
            time.sleep(5)
        flag = False
        next_token = None
    time.sleep(5)

Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200
Endpoint Response Code: 200


### Store results in Pandas DataFrame

In [12]:
twitter_response_dataframe = pd.read_csv("data.csv")

In [13]:
len(twitter_response_dataframe)

14754

In [14]:
twitter_response_dataframe.head()

Unnamed: 0,author id,tweet_id,lang,text
0,2834378727,1488070438039269377,en,RT @AwesomePrecio11: Dj Khaled ft Justin Biebe...
1,1357463130146820097,1488070020085301251,en,Dj Khaled ft Justin Bieber let it go #music #N...
2,31641350,1488067094394638339,en,#Music Essence by WizKid ft. Justin Bieber #No...
3,1265013369859383307,1488059532723036160,en,"#NowPlaying: Peaches (Remix) [feat. Ludacris, ..."
4,1357938465611071489,1488002270914654210,en,RT @HOODZRADIO: #Music Essence by WizKid ft. J...


In order to do data cleaning (remove duplicate tweets, re order columns, drop redundant columns etc.)- converted the JSON response to Pandas DataFrame

In [15]:
#drop the lang column since it's "en" for entire dataset
twitter_response_dataframe=twitter_response_dataframe.drop(['lang'],axis=1)

In [16]:
#rename columns for readability
twitter_response_dataframe=twitter_response_dataframe.rename(columns={'text':'tweet', 'author id':'author_id'})

In [17]:
cols = list(twitter_response_dataframe.columns.values)
cols

['author_id', 'tweet_id', 'tweet']

In [18]:
#reorder the columns
twitter_response_dataframe=twitter_response_dataframe[['tweet_id','author_id','tweet']]

In [19]:
len(twitter_response_dataframe)

14754

### Get Rid of duplicate tweets

Note: The Third tweet looks like a duplicate of the first tweet, this can be a retweet by a different author. Assuming this as duplicate, taken necessary steps below

In [20]:
#keep the first occuring tweet and drop all duplicate tweets
twitter_response_dataframe=twitter_response_dataframe.drop_duplicates(subset='tweet',keep='first')

In [21]:
len(twitter_response_dataframe)

3399

In [22]:
twitter_response_dataframe['tweet_id'] = twitter_response_dataframe['tweet_id'].apply(str)

In [23]:
twitter_response_dataframe['author_id'] = twitter_response_dataframe['author_id'].apply(str)

In [24]:
len(twitter_response_dataframe)

3399

### Filter tweets having to do with music

In [25]:
twitter_response_dataframe = twitter_response_dataframe.loc[twitter_response_dataframe['tweet'].str.contains("#music", case=False)]

In [26]:
len(twitter_response_dataframe)

86

username: postgres
password: winterWon1!


### CONNECT TO DATABASE

Here I have used Postgres DB on AWS RDS and using the psycopg2 library to connect to the database. More details in README.md

In [27]:
#!pip install psycopg2-binary

In [28]:
import psycopg2 as ps

In [29]:
twitter_response_dataframe.head()

Unnamed: 0,tweet_id,author_id,tweet
0,1488070438039269377,2834378727,RT @AwesomePrecio11: Dj Khaled ft Justin Biebe...
1,1488070020085301251,1357463130146820097,Dj Khaled ft Justin Bieber let it go #music #N...
2,1488067094394638339,31641350,#Music Essence by WizKid ft. Justin Bieber #No...
3,1488059532723036160,1265013369859383307,"#NowPlaying: Peaches (Remix) [feat. Ludacris, ..."
4,1488002270914654210,1357938465611071489,RT @HOODZRADIO: #Music Essence by WizKid ft. J...


In [31]:
host_name = 'database-1.cp72bttmlvq7.us-west-1.rds.amazonaws.com'
dbname = 'postgres'
port = '5432'
username = 'x'
password = 'xx'

In [32]:
def connect_to_db(host_name, dbname, port, username, password):
    '''
    This method is to connect to the database by passing
    credentials
    '''
    try:
        conn = ps.connect(host=host_name, database=dbname, user=username,
        password=password, port=port)

    except ps.OperationalError as e:
        raise e
    else:
        print('Connected!')
    return conn

Note: The Cursor class in psycopg2 allows Python code to execute PostgreSQL command in a database session.

In [33]:
def create_table(curs, tablename):
    create_table_command = ("""CREATE TABLE IF NOT EXISTS %s (
                tweet_id VARCHAR(255) NOT NULL PRIMARY KEY,
                author_id VARCHAR(255) NOT NULL,
                tweet TEXT NOT NULL
           )""")
    curs.execute(create_table_command, [ps.extensions.AsIs(tablename)])#AsIs is to ensure that table name does not consist of the quotes

In [34]:
conn = connect_to_db(host_name, dbname, port, username, password)
curs = conn.cursor()

Connected!


#### Connection has been made successfully to the DB at this point 

In [35]:
tablename="TWITTERSTREAMS"
create_table(curs,tablename)

Check to see if the tweet id already exists in the DB, if it exists then update the row

### Store tweets to the database

In [36]:
def check_if_tweet_id_exists(curs, tweet_id):
    '''
    This method checks if tweet_id (which is the PK)
    already exists in the DB. This is to take care of any UniquePK violation error at a later point
    '''
    query = ("""SELECT tweet_id FROM TWITTERSTREAMS WHERE tweet_id = %s""")
    curs.execute(query, (tweet_id,))
    return curs.fetchone() is not None

Update the table if the tweet_id exists

In [37]:
def create_temp_df(curs,df):
    '''
    This method updates the DB for every unique tweet_id received from the 
    pandas dataframe
    '''
    tmp_df=pd.DataFrame()
    for i, row in df.iterrows():
        #this check is important to avoid UniquePrimaryKey violation error
        if check_if_tweet_id_exists(curs, row['tweet_id'])==False: 
            tmp_df = tmp_df.append(row)
    return tmp_df

In [38]:
def insert_into_table(curs,tweet_id, author_id, tweet):
    '''
    This method is to insert data based on the fields specified as parameters 
    '''
    insert_into_DB = ("""INSERT INTO TWITTERSTREAMS (tweet_id, author_id,tweet) VALUES(%s,%s,%s);""")
    row_to_insert = (tweet_id, author_id, tweet)
    curs.execute(insert_into_DB, row_to_insert)


In [39]:
def dataFrame_to_DB(curs,df):
    '''
    This method iterates over all the rows of the dataframe
    and uses the insert_into_table method to insert every row
    '''
    for i, row in df.iterrows():
           insert_into_table(curs,row['tweet_id'], row['author_id'], row['tweet'])

In [40]:
new_vid_df = create_temp_df(curs,twitter_response_dataframe)

In [41]:
len(new_vid_df)

86

In [42]:
dataFrame_to_DB(curs, new_vid_df)
conn.commit() #sends a commit statement to the DB

In [43]:
#This can be used for rollbacks in case of errors which inserting rows/ updating wors
# query = ("rollback")
# curs.execute(query)

In [44]:
conn = connect_to_db(host_name, dbname, port, username, password)
query = ("""SELECT * FROM TWITTERSTREAMS""")
pd.read_sql_query(query, conn)

Connected!


Unnamed: 0,tweet_id,author_id,tweet
0,1488070438039269377,2834378727,RT @AwesomePrecio11: Dj Khaled ft Justin Biebe...
1,1488070020085301251,1357463130146820097,Dj Khaled ft Justin Bieber let it go #music #N...
2,1488067094394638339,31641350,#Music Essence by WizKid ft. Justin Bieber #No...
3,1488059532723036160,1265013369859383307,"#NowPlaying: Peaches (Remix) [feat. Ludacris, ..."
4,1488002270914654210,1357938465611071489,RT @HOODZRADIO: #Music Essence by WizKid ft. J...
...,...,...,...
81,1488006648681611265,1313496680004812806,Top 50 - Malaysia CLEAN 🇲🇾:\n\n🥇 Justin Bieber...
82,1487991553243111426,1313496680004812806,Top 50 - South Korea CLEAN 🇰🇷:\n\n🥇 V (4 days)...
83,1487984012064927747,1369044145352871936,Top 100 - Australia CLEAN 🇦🇺:\n\n🥇 The Kid LAR...
84,1487984006339735554,1313496680004812806,Top 50 - Australia CLEAN 🇦🇺:\n\n🥇 The Kid LARO...


In [45]:
#count of all tweets consumed
query = ("""SELECT count(*) FROM TWITTERSTREAMS""")
pd.read_sql_query(query, conn)

Unnamed: 0,count
0,86


In [46]:
#count all distinct tweets
query = ("""SELECT count(DISTINCT TWEET) FROM TWITTERSTREAMS""")
pd.read_sql_query(query, conn)

Unnamed: 0,count
0,86


In [47]:
#twitter_response_dataframe.to_csv("output.csv")