# Building an ETL Pipeline

As the second part of the predict for Gather, you will need to build a pipeline of functions in python which does the following:

1. Function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns using the following functions: <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a) Hashtag Remover from Analyse Functions
<br>
<br>
3. Functions which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

In [14]:
# General:
import tweepy           # To consume Twitter's API
import pandas as pd     # To handle data
import numpy as np      # For numerical computation
import json
# For plotting and visualization:
from IPython.display import display
import pyodbc


# Consumer and Access details

Fill in your Consumer and Access details you should have recieved when applying for a Twitter API. 

In [15]:
# Consumer:
CONSUMER_KEY    = 'EuNJMojRHEbPbtSrthdVzknNy'
CONSUMER_SECRET = '1nT3p0Yi4qfSBYWMSFLtTpDzTv26myArNdTZpwxf9UJIUtOHM2'

# Access:
ACCESS_TOKEN  = '881748775-5msicWHp5MLrgiuYEGf9VaWZAPtSSBKo50cC6BHt'
ACCESS_SECRET = 'OKHYqGIxEE9ON05vX4TTxhna1qtoZTl58d1XQtjYHAvJS'

In [16]:
# API's setup:
def twitter_setup():
    """
    Utility function to setup the Twitter's API
    with access and consumer keys from Twitter.
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth, timeout=1000)
    return api

# Function 1:

Write a function which:
- Scrapes _"Eskom_SA"_ tweets from Twitter. 

Function Specifications:
- The function should return a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [17]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET):
    '''
    Inputs:
    This function takes takes in CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET to connect to scrape
    "Eskom_SA" tweets using the twitter_setup() function which return a twitter api.

    Returns:
    The function returns a dataframe with 'Tweets' and 'Date' columns, populated with 190 tweets from Eskom_SA from the month of February.
    Both columns are strings.
    '''

    # Extracts tweets from the twitter API using the user_timeline api method to srape a specific user, "Eskom_SA" in our case.
    # We extract 190 tweets before a tweet with the given id = 1231863420103864320 in February; this is for consistence
    # so that we may always get the same tweets when we run the function.
    tweets = twitter_setup().user_timeline(screen_name="Eskom_SA", count=190, max_id=1231863420103864320)

    # The line below extracts the the actual tweet(text) and the tweet date(created_at) from the api as lists,
    # and assigns them to 'Tweets' and 'Date' respectively, in a dictionary which is then passed as the data within a pandas dataframe.
    # the pandas dataframe with 'Tweets' and 'Date' columns is assigned to df.
    df = pd.DataFrame(data={'Tweets': [tweet.text for tweet in tweets],
                            'Date': [str(tweet.created_at) for tweet in tweets]})

    # return the dataframe.
    return df

# Function 2: Removing hashtags and the municipalities

Write a function which:
- Uses the function you wrote in the Analyse section to extract the hashtags and municipalities into it's own column in a new data frame. 

Function Specifications:
- The function should take in the pandas dataframe you created in Function 1 and return a new pandas dataframe. 

In [18]:
#df = twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ) assigns the returned dataframe to df
df = twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET )

In [19]:
df

Unnamed: 0,Tweets,Date
0,@_Veronique_L Pls call your service provider i...,2020-02-24 08:48:20
1,@AobakweT Eskom is currently not loadshedding....,2020-02-24 08:47:56
2,@Aut771 Eskom is currently not loadshedding. P...,2020-02-24 08:47:27
3,@Makimofokeng4P Eskom is currently not loadshe...,2020-02-24 08:46:34
4,RT @Exposcience: #MondayMotivation Cape Town t...,2020-02-24 08:36:30
...,...,...
185,@Nyatsi_Mpulwane @News24 @TimesLIVE @eNCA @IOL...,2020-02-17 07:51:21
186,@lekalakala30 @SABCNewsOnline @IOL @eNCA @ewnu...,2020-02-17 07:48:42
187,Reduce energy use in the office to help curb l...,2020-02-17 06:58:00
188,#POWERALERT 1\nDate: 17 February 2020\n\nLoads...,2020-02-17 05:13:15


In [20]:
def extract_municipality_hashtags(df):
    """This function extracts hashtags and 'municipalities' into their own columns within the given Pandas DataFrame

    Inputs:
    The function takes the tweeter_df dataframe, df, which has 'Tweets' and 'Dates' colummns.

    Municipalities in the mun_dict dictionary are identified from the 'Tweet' column and singled out into a 'Municipality' column,
    if there isn't any municipality, nan will occupy that entry.

    Hashtags in the 'Tweets' column are identified and singled out into a 'Hashtags' column as lists,
    if there isn't any hashtag, nan will occupy that entry.

    Returns:
    The Function Returns A Modified Dataframe which includes 'Municipality' and 'Hashtags' columns.
    """

    # Dictionary mapping official municipality twitter data
    mun_dict = {
        '@CityofCTAlerts': 'Cape Town',
        '@CityPowerJhb': 'Johannesburg',
        '@eThekwiniM': 'eThekwini',
        '@EMMInfo': 'Ekurhuleni',
        '@centlecutility': 'Mangaung',
        '@NMBmunicipality': 'Nelson Mandela Bay',
        '@CityTshwane': 'Tshwane'}

    # Funtion to identify municipality tags in a string of data
    def tag(line):
        linelist = line.split()
        for word in linelist:
            if word in mun_dict.keys():
                return mun_dict[word]  # returns the dictionary value of the identified tag if it in the dict keys
        else:
            return np.nan  # return nan if there isn't a municipality tagged.

    # Funtion to identify hashtags in line of data
    def hashtag(line):
        list_of_hashtags = []  # Initialize empty list
        linelist = line.split()

        for word in linelist:
            if word[0] == '#':  # If a word begins with '#'
                list_of_hashtags.append(word.lower())  # append it as lowercase

        if len(list_of_hashtags) == 0:
            return np.nan  # if the hashtag list is empty at the end, return nan
        else:
            return list_of_hashtags  # else return the list of hashtags

    # Makes a 'Municipality' column by applying the tag function on each row of 'Tweets'
    df['Municipality'] = df['Tweets'].apply(tag)
    df['Municipality'] = df['Municipality'].apply(str)  # converts the entries to string for SQL convenience

    # Makes a 'Hashtags' column by applying the hashtag function on each row of 'Tweets'
    df['Hashtags'] = df['Tweets'].apply(hashtag)
    df['Hashtags'] = df['Hashtags'].apply(str)  # converts the entries to string for SQL convenience

    # returns the modified dataframe
    return df


extract_municipality_hashtags(df)


Unnamed: 0,Tweets,Date,Municipality,Hashtags
0,@_Veronique_L Pls call your service provider i...,2020-02-24 08:48:20,,
1,@AobakweT Eskom is currently not loadshedding....,2020-02-24 08:47:56,,
2,@Aut771 Eskom is currently not loadshedding. P...,2020-02-24 08:47:27,,
3,@Makimofokeng4P Eskom is currently not loadshe...,2020-02-24 08:46:34,,
4,RT @Exposcience: #MondayMotivation Cape Town t...,2020-02-24 08:36:30,,"['#mondaymotivation', '#eskomexpoisf']"
...,...,...,...,...
185,@Nyatsi_Mpulwane @News24 @TimesLIVE @eNCA @IOL...,2020-02-17 07:51:21,,
186,@lekalakala30 @SABCNewsOnline @IOL @eNCA @ewnu...,2020-02-17 07:48:42,,
187,Reduce energy use in the office to help curb l...,2020-02-17 06:58:00,,['#useelectricitysmartly']
188,#POWERALERT 1\nDate: 17 February 2020\n\nLoads...,2020-02-17 05:13:15,,['#poweralert']


# Function 3: Updating SQL Database with pyODBC

Write a function which:
- Connects and updates your SQL database. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [21]:
def pyodbc_twitter(connection, df, twitter_table):
    """This function connects to and updates a SQL database using a pandas dataframe

    Input:
    The function takes in a connection string, a dataframe that will be used to update a twitter table and a twitter table.
    The Twitter table must have already been created on the database.
    The twitter_table input must follow the format: 'Database.Schema.Table' as a string input

    The dataframe, df, must have 4 columns, 'Tweets, Date, Municipality, Hashtags' exactly as created in Twitter table in SQL.

    The dataframe string contents are used to update the twitter table.

    Output:
    None
    """

    # make a list of tuples containing each row as a tuple entry within the list, list_of_rows
    list_of_rows = [tuple(df.iloc[i]) for i in range(df.shape[0])]

    # The following query reads 'INSERT INTO twitter_table  (Tweets, Date, Municipality, Hashtags ) VALUES(?,?,?,?)'
    # It will insert the values in list_of_rows into the twitter_table
    query = "INSERT INTO " + twitter_table + ' (' + ', '.join(df.columns) + ' )' + " VALUES(?,?,?,?)"  # VALUES(?,?,?,?) will inserted from the list_of_rows tuples

    # The with statement is more efficient because it will automatically close the connection after use.
    # Establishes the the connection as conx using the connection string input
    with pyodbc.connect(connection) as conx:
        cursor = conx.cursor()  # creates cursor object
        cursor.executemany(query, list_of_rows)  # Executes the query to add all the entries in the list_of_rows

    return None

In [22]:
details = {
 'server' : 'localhost',
 'database' : 'the_database,
 'username' : 'sa',
 'password' : 'sql_server_password'
 }



connect_string = 'DRIVER={{ODBC Driver 17 for SQL Server}};\
            SERVER={server};\
            PORT=1443; \
            DATABASE={database};\
            UID={username};\
            PWD={password}'.format(**details)

In [23]:
pyodbc_twitter(connect_string, df, 'gather_eskom.dbo.Twitter') #inserts the pandas dataframe columns into the twitter

In [22]:
df

Unnamed: 0,Tweets,Date
0,@_Veronique_L Pls call your service provider i...,2020-02-24 08:48:20
1,@AobakweT Eskom is currently not loadshedding....,2020-02-24 08:47:56
2,@Aut771 Eskom is currently not loadshedding. P...,2020-02-24 08:47:27
3,@Makimofokeng4P Eskom is currently not loadshe...,2020-02-24 08:46:34
4,RT @Exposcience: #MondayMotivation Cape Town t...,2020-02-24 08:36:30
...,...,...
185,@Nyatsi_Mpulwane @News24 @TimesLIVE @eNCA @IOL...,2020-02-17 07:51:21
186,@lekalakala30 @SABCNewsOnline @IOL @eNCA @ewnu...,2020-02-17 07:48:42
187,Reduce energy use in the office to help curb l...,2020-02-17 06:58:00
188,#POWERALERT 1\nDate: 17 February 2020\n\nLoads...,2020-02-17 05:13:15
