# Building an ETL Pipeline

As the second part of the predict for Gather, you will need to build a pipeline of functions in python which does the following:

1. Function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns using the following functions: <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a) Hashtag Remover from Analyse Functions
<br>
<br>
3. Functions which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

In [16]:
# General:
import tweepy           # To consume Twitter's API
import pandas as pd     # To handle data
import numpy as np      # For numerical computation
import json
# For plotting and visualization:
from IPython.display import display
import pyodbc


# Consumer and Access details

Fill in your Consumer and Access details you should have recieved when applying for a Twitter API. 

In [17]:
# Consumer:
CONSUMER_KEY    = 'UHKPdUdnN0uQGOfFoQZC9RDkL'
CONSUMER_SECRET = 'UWFUR1Q1ajoDIgjAfZaUmNBDMfNCaj7ZInNlO92zURew4AvZns'

# Access:
ACCESS_TOKEN  = '1282712662565167106-TU1PY6XYWkGiGFaF9WbRttZ4E5XJxj'
ACCESS_SECRET = 'wELEl6GVOSMS1xB3ZCMUKpST2ayWYAfXCXopDVSWRRjMc'

In [18]:
# API's setup:
def twitter_setup():
    """
    Utility function to setup the Twitter's API
    with access and consumer keys from Twitter.
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth, timeout=1000)
    return api

In [19]:
twitter_setup()

<tweepy.api.API at 0x1e0af0b3308>

# Function 1:

Write a function which:
- Scrapes _"Eskom_SA"_ tweets from Twitter. 

Function Specifications:
- The function should return a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [20]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ):

    # Code Here
    
    """
    This function uses the Twitter API object to scrape tweets on "Eskom_SA" from Twitter
    
    Args: 
         consumer_key (string) : twitter consumer key
         consumer_secret (string) : twitter consumer secret code
         access_token (string) : twitter access token
         access_secret (string) : twitter access secret code
         
    Returns: 
        dataframe: returns a dataframe with two coloumns (Date and Tweets)
        
    """
    
    
    api = twitter_setup() # This calls the Twitter API and store it in a variable
    
    tweets = [] # create an empty list to be used later in the function to hold our tweets
    
    count = 1  
    
    ''' The below first step collects tweets using the Cursor object from tweepy
        - .Cursor() returns an object that you can iterate or loop over to access the data collected
        - Each item in the iterator has various attributes that you can access to get information about each tweet'''

    for tweet in tweepy.Cursor(api.search, q = '@Eskom_SA', since = '2020-08-10').items(200): # q is the search word from tweet and since is the date the tweet was sent while items is the no of tweets to scrape 
        
        if (not tweet.retweeted) and ('RT @' not in tweet.text): # remove all retweets from the list of scraped tweets
        
            count += 1 # a counter-incremental

            try:

                data = [tweet.created_at, tweet.text] # store tweets in a list
                data = tuple(data)
                tweets.append(data) #append the tweets gathered to the empty tweets-list

            except tweepy.TweepError as e: # create an exception to capture any error encountered
                print(e.reason) # print the error
                continue

            except StopIteration:
                break
        
    df = pd.DataFrame(tweets, columns = ['Date', 'Tweets']) # convert the list to a DataFrame with the columns: Date and Tweets
    
    return df  # this returns the dataframe

In [27]:
df = twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET )  # we call the function and store it in a variable df
df.shape

(100, 2)

# Function 2: Removing hashtags and the municipalities

Write a function which:
- Uses the function you wrote in the Analyse section to extract the hashtags and municipalities into it's own column in a new data frame. 

Function Specifications:
- The function should take in the pandas dataframe you created in Function 1 and return a new pandas dataframe. 

In [22]:
def extract_municipality_hashtags(df):
    
    ### Code Here
    """
    Return a data frame with four columns. Column of tweets, the date of when each tweet was posted, municipality,
    and the hashtags used on the tweets
    
    """
    mun_dict = {'@CityofCTAlerts' : 'Cape Town',
                '@CityPowerJhb' : 'Johannesburg',
                '@eThekwiniM' : 'eThekwini' ,
                '@EMMInfo' : 'Ekurhuleni',
                '@centlecutility' : 'Mangaung',
                '@NMBmunicipality' : 'Nelson Mandela Bay',
                '@CityTshwane' : 'Tshwane'}
    
    handles = list(mun_dict.keys())
    
    df['municipality'] = df['Tweets'].str.extract('({})'.format('|'.join(handles)), expand=False).fillna(np.nan)
    df['municipality'] = df['municipality'].map(mun_dict)
    
    df['hashtags'] = df['Tweets'].str.findall(r'#.*?(?=\s|$)')
    df['hashtags'] = df['hashtags'].apply(lambda x: np.nan if len(x)==0 else [x.lower() for x in x])
    
    return df

In [23]:
df = extract_municipality_hashtags(df)

In [24]:
'''
The SQL database connection was declared and passed into a variable
'''
connection = pyodbc.connect(driver='{SQL Server}',
                     host='LAPTOP-V2T8356G',
                     database='gather_eskom',
                     trusted_connection='tcon',
                     user='LAPTOP-V2T8356G\mojel',
                     autocommit = True)
twitter_table = 'gather_eskom.dbo.twitter_table'
cur = connection.cursor()
df = df[['Tweets','Date']] # this stores the Data and Tweets column of the tweets dataframe

# Function 3: Updating SQL Database with pyODBC

Write a function which:
- Connects and updates your SQL database. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [25]:
def pyodbc_twitter(connection, df, twitter_table):
    ### Code Here
    """
    This function uses the "Eskom_SA" tweets dataframe 
    and update the SQL database twitter table
    
    Args: 
         connection (string) : connection details to SQL database
         df (dataframe) : the returned scraped tweets dataframe from Function 2
         twitter_table (string) : the SQL server table to be updated with the scraped tweets
         
    Returns: 
        This function does not return any output
        
    """
    
    connection = connection
    cursor = cur 
    insert_query = '''INSERT INTO '''+twitter_table+'''(Tweets, Date)
                      VALUES (?,?);''' # using a variable to store the query that will update the SQL server twitter table
    for i,j in df.itertuples(index=False): # this is a call of every values in the iterable df
        values = (i,j)
        cursor.execute(insert_query, values) #this uses the execute method of the cursor to execute the SQL query
        connection.commit() # commits the update onto the SQL server database
    return None

In [26]:
pyodbc_twitter(connection, df, twitter_table)

### Trello board link

[Trello board](https://trello.com/b/eiaTeGUA/team-2gather)