# Building an ETL Pipeline

This Pipeline consists of the following functions

1. A function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. A function that Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns comprising of the 'Municipalities Involved' and 'Hashtags' <br>
<br>
3. A function which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

In [11]:
### Importing Packages

# General:

import tweepy           
import pandas as pd     
import numpy as np      
import json
import re
from IPython.display import display
import pyodbc


# Consumer and Access details

Enter your Consumer and Access details you should have recieved when applying for a Twitter API. 

In [2]:
# Consumer:
CONSUMER_KEY    = 'ENTER CONSUMER KEY'
CONSUMER_SECRET = 'ENTER CONSUMER SECRET'

# Access:
ACCESS_TOKEN  = 'ENTER ACCESS TOKEN'
ACCESS_SECRET = 'ENTER ACCESS SECRET'

In [3]:
# API's setup:
def twitter_setup():
    """
    Utility function to setup the Twitter's API
    with access and consumer keys from Twitter.
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

    # Return API with authentication:
    api = tweepy.API(auth, timeout=1000)
    return api

# Function 1:

Function Specifications:
- The function returns a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [8]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ):
    """
    Extracts the latest 200 tweets along with corresponding datetime from
    the Eskom_SA handle excluding the retweets.
    
    Args :
        CONSUMER_KEY (str)
        CONSUMER_SECRET(str)
        ACCESS_TOKEN(str)
        ACCESS_SECRET(str)
        
    Returns :
        data : Pandas Dataframe , columns 'Tweets' & 'Date'
        
    """
    extractor = twitter_setup()
    tweets = extractor.user_timeline(screen_name="Eskom_SA", 
                                 count=200,
                                 include_rts=False)
    data = pd.DataFrame(data=np.column_stack([[tweet.text for tweet in tweets],
                                          [(tweet.created_at) for tweet in tweets]]),
                                          columns=['Tweets','Date'])
    return data
    return None

# Function 2: Removing hashtags and the municipalities

Function Specifications:
- The function should take in the pandas dataframe you created in Function 1 and return a new pandas dataframe. 

In [9]:
def extract_municipality_hashtags(df):
    """
    Function takes the dataframe returned by twitter_df() and extracts the municipality
    from each tweet by mapping the tweet column to the dictionary of all KEY municipalities
    and also extracts the hashtags from the tweet. The municipalities & Hashtags are placed
    on their individual columns
    
    Args :
        df : Pandas Dataframe
    
    Returns :
    
        df : Pandas Dataframe with 'municipalities(str object) and hashtags(list of str)'
    
    """
    mun_dict = {'@CityofCTAlerts' : 'Cape Town',
                '@CityPowerJhb' : 'Johannesburg',
                '@eThekwiniM' : 'eThekwini' ,
                '@EMMInfo' : 'Ekurhuleni',
                '@centlecutility' : 'Mangaung',
                '@NMBmunicipality' : 'Nelson Mandela Bay',
                '@CityTshwane' : 'Tshwane'
                '@VhembeDM : Vhembe'
                '@EhlanzeniM : Ehlanzeni'
                '@NCProvGov : NorthernCape_Gov'
                '@NorthWestZA : NorthWest'}    
    handles = list(mun_dict.keys())    
    df['municipality'] = df['Tweets'].str.extract('({})'.format('|'.join(handles)), expand=False).fillna(np.nan)
    df['municipality'] = df['municipality'].map(mun_dict)    
    df['hashtags'] = df['Tweets'].str.findall(r'#.*?(?=\s|$)')
    df['hashtags'] = df['hashtags'].apply(lambda x: np.nan if len(x)==0 else [x.lower() for x in x])
    df['hashtags'] = (df['hashtags']).astype(str)
    return df    
    return None

In [13]:
df_fresh_tweets = extract_municipality_hashtags(twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ))

# Function 3: Updating SQL Database with pyODBC

Function Specifications:
- The function takes in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [14]:
###---Setting up connection---###
###---Extracting old table from SQL---###
from sqlalchemy import create_engine
import urllib
conn = pyodbc.connect(driver='{SQL Server}',
                          host='ENTER_HOST',                        ### Displayed when you log into SQL  
                          database='Eskom_Database',                ### Make sure name matches the one used in SQL
                          trusted_connection='tcon',
                          user='sa')
twitter_table = pd.read_sql_query('select * from dbo.twitter_table',conn)

In [15]:
def pyodbc_twitter(connection, df, twitter_table):
    """
    Function takes the dataframe returned by extract_municipality_hashtags() , Update
    the old twitter table in SQL using the pyodbc connection
    
    Args :
        Connection : A connection between python and SQL Server using pyodbc module
        df : Pandas Dataframe , Latests tweets and dates along with municipalities and hashtags columns
        twitter_table : Pandas Dataframe , Old table available in the SQL database yet to be updated
    
    Returns :
         None
    
    """
    params = urllib.parse.quote_plus("DRIVER={SQL Server};SERVER=ENTER_HOST;DATABASE=Eskom_Database;UID=sa;trusted_connection='tcon'")     ### Enter your host on 'Server' in string format                            
    engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
    df = pd.merge(df_fresh_tweets,twitter_table, how='outer', left_on=['Tweets','Date','municipality','hashtags'], right_on = ['Tweets','Date','municipality','hashtags'])
    df.to_sql('twitter_table', con=engine , if_exists = 'replace' , index = False)    
    return None 
pyodbc_twitter(conn, df_fresh_tweets, twitter_table)