# Building an ETL Pipeline

As the second part of the predict for Gather, you will need to build a pipeline of functions in python which does the following:

1. Function to connect to twitter and scrapes "Eskom_SA" tweets.
<br>
<br>
2. Cleans/Processes the tweets from the scraped tweets which will create a dataframe with two new columns using the following functions: <br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; a) Hashtag Remover from Analyse Functions
<br>
<br>
3. Functions which connects to your SQL database and uploads the tweets into the table you store the tweets in the database.

In [1]:
# General:
import tweepy           # To consume Twitter's API
import pandas as pd     # To handle data
import numpy as np      # For numerical computation
import json
# For plotting and visualization:
from IPython.display import display
import pyodbc

## ETL Pipeline

The follwing procedure is followed by the pipeline:
1. Raw twitter is extracted from twitter using the twitter API. This information comes in the form of a tweet, timestamp and registered location of the user account.
2. A connnection is initiated between Jupyter Notebook and SQL Server. 
3. Raw twitter data is sent to the SQL server for backup.
4. Connection is terminated between the Notebook and SQL.
5. The existance of the raw twitter data is verified from SQL.
6. A reconnection is established.
7. Raw data is extracted from SQL for processing.
8. Data is processed and sent back to another table in SQL for holding processed twitter data.

# Consumer and Access details

Fill in your Consumer and Access details you should have recieved when applying for a Twitter API. 

In [2]:
# Consumer:
MY_CONSUMER_KEY    = ''
MY_CONSUMER_SECRET = ''

# Access:
MY_ACCESS_TOKEN  = ''
MY_ACCESS_SECRET = ''

# Function 1:

Write a function which:
- Scrapes _"Eskom_SA"_ tweets from Twitter. 

Function Specifications:
- The function should return a dataframe with the scraped tweets with just the "_Tweets_" and "_Date_". 
- Will take in the ```consumer key,  consumer secret code, access token``` and ```access secret code```.

NOTE:
The dataframe should have the same column names as those in your SQL Database table where you store the tweets.

In [20]:
def twitter_df(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_SECRET ):
    """Returns a dataframe with scraped Eskom_SA tweets, date of the tweet and location from Twitter
    
    Parameters:
    -----------
    CONSUMER_KEY    = 'Twitter API Key'
    
    CONSUMER_SECRET = 'Twitter API Secret Key'

    ACCESS_TOKEN  = 'Twitter Access Token'
    
    ACCESS_SECRET = 'Twitter Access Token Secret'
    
    Examples:
    ---------
    
    >>> twitter_df('API_key', 'API_secret_key', 'access_token', 'access_secret_token' )
    >>> twitter_df
    
    tweet                                |                  date |          city |
    -------------------------------------|-----------------------|---------------|
    Some tweet @someone about #something |   2020-03-11 11:37:49 |  Johannesburg |
    Another tweet about #something       |   2020-03-11 12:11:45 |     Cape Town |
    """

    # Authentication and access using keys:
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
    
    api = tweepy.API(auth, timeout=1000)
    
    tweets = []
    dates = []
    location = []
    result = pd.DataFrame()
    for tweet in api.search(q="@Eskom_SA -filter:retweets", lang="en", rpp=100, count=60):
        tweets = tweets + [f"{tweet.text}"]
        dates = dates + [f"{tweet.created_at}"]
        location = location + [f"{tweet.user.location}"]
    result['Tweets'] = tweets
    result['Date'] = dates
    result['Location'] = location
    return result

In [21]:
tweets_df = twitter_df(MY_CONSUMER_KEY, MY_CONSUMER_SECRET, MY_ACCESS_TOKEN, MY_ACCESS_SECRET )

In [22]:
conn = pyodbc.connect(driver='{SQL Server}',
                      host='EDSA-PGLBGKO\SQLEXPRESS',
                      database='gather_eskom',
                      uid='sa',
                      pwd='edsa@2020')

# Updating SQL Database with pyODBC

The function connects and updates the SQL database consisting of raw tweets. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [23]:
def pyodbc_twitter_raw(connection, df, twitter_table):
    """Extracts a dataframe containing tweets, connects and updates the data in your local SQL database

    Parameters:
    -----------
    connection: SQL connection settings

    df: DataFrame of tweets with their timestamp and location 

    twitter_table(str): An already existing twitter SQL database 

    Examples:
    --------
    >>> conn = pyodbc.connect(driver='{SQL Server}',
                      host='your_server_name',
                      database='your_database_name', 
                      uid='your_user_name',
                      pwd='your_password')    
    
    >>> df = pd.read_sql_query('SELECT * FROM twitter_table', conn)
    >>> df
    
                                  tweet  |                  date |          city |
    -------------------------------------|-----------------------|---------------|
    Some tweet @someone about #something |   2020-03-11 11:37:49 |  Johannesburg |
    Another tweet about #something       |   2020-03-11 12:11:45 |     Cape Town |
    
    The following command will update the twitter_table in the SQL Server database:
    >>> pyodbc_twitter(conn, df, 'my_sql_table')
    """
    cursor = connection.cursor()
    
    for i in range(len(df.index)):
        
        tweet_text = df['Tweets'][i]
        tweet_text = tweet_text.replace("'","")
        tweet_date = df['Date'][i]
        tweet_location = df['Location'][i]
        
        cursor.execute(
            f"""
            INSERT INTO {twitter_table}
            VALUES ('{tweet_text}', '{tweet_date}', '{tweet_location}')
            """
        )
        
        
    conn.commit()
    return None 

In [24]:
pyodbc_twitter_raw(conn, tweets_df, 'eskom_tweets_raw')

In [25]:
# Terminate connection to verify on SQL if data has been transferred
conn.close()

In [26]:
# Re-establish a connection
conn = pyodbc.connect(driver='{SQL Server}',
                      host='EDSA-PGLBGKO\SQLEXPRESS',
                      database='gather_eskom',
                      uid='sa',
                      pwd='edsa@2020')

In [27]:
# Extract raw data from SQL
sql_df = pd.read_sql_query('SELECT DISTINCT * FROM eskom_tweets_raw', conn)

In [29]:
sql_df.head()

Unnamed: 0,tweets,date,city
0,#EskomVSnersa Eskom has welcomed the court’s d...,2020-03-11 12:45:34,@CapricornFMNews
1,@autrenuit @Eskom_SA @News24 @TimesLIVE @eNCA ...,2020-03-11 11:06:04,
2,@CyrilRamaphosa o patla jo!!! Kore its approac...,2020-03-11 12:37:21,I am not sure about the name
3,@DavidPearmain1 @WyrldWyrm @DirtySoundRec @Esk...,2020-03-11 12:27:02,
4,@Eskom_SA you make things more difficult for ...,2020-03-11 11:06:39,"Port Elizabeth, South Africa"


In [30]:
# Creating a dataframe consisting of South Africa's cities and their respective provinces 
location_df = pd.read_excel('locations.xlsx')
location_df.head()

Unnamed: 0,Name,Province
0,Aalwynsfontein,Northern Cape
1,Aan de Doorns,Western Cape
2,Aberdeen,Eastern Cape
3,Aberfeldy,Free State
4,Abbotsdale,Western Cape


In [31]:
sql_df['Province'] = ''

In [32]:
for i in range(len(location_df)):
    
    for j in range(len(sql_df)):
        
        # Insert 'Not Specified' when the city is blank'
        if sql_df['city'][j] == '':
            sql_df['Province'][j] = 'Not Specified'
        elif str(location_df['Name'][i]).lower() in str(sql_df['city'][j]).lower():
            sql_df['Province'][j] = location_df['Province'][i]
        
        # After the loop has finished, insert 'Not Specified' if no matching city is found'
        if sql_df['Province'][j] == '':
            sql_df['Province'][j] = 'Not Specified'

In [15]:
sql_df.head()

Unnamed: 0,tweets,date,city,Province
0,@koko_matshela @KhandaniM @EFFSouthAfrica @Esk...,2020-03-11 11:34:58,Gauteng,Not Specified
1,@TerranceDJacob1 @koko_matshela @EFFSouthAfric...,2020-03-11 11:33:39,"Sandton, South Africa",Gauteng
2,"@Eskom_SA Nevermind your systems, you have a g...",2020-03-11 11:32:58,Around.,Not Specified
3,"@Eskom_SA Dear Eskom,\nWhen this will finish, ...",2020-03-11 11:31:18,,Not Specified
4,@SikonathiM @Eskom_SA So u will send this regr...,2020-03-11 11:29:38,"KwaZulu Natal, Durban",KwaZulu-Natal


In [16]:
province_df = pd.read_sql_query('SELECT * FROM province_map', conn)

In [17]:
# Remove "\r\n" that is present in some of the cells
province_list = []
for i in range(len(province_df)):
    province_list.append(str(province_df.iloc[i]['province']).replace("\r\n",""))

In [18]:
province_df['Province'] = province_list
province_df

Unnamed: 0,province_id,province,Province
0,1,Eastern Cape\r\n,Eastern Cape
1,2,Free State,Free State
2,3,Gauteng,Gauteng
3,4,Kwazulu Natal\r\n,Kwazulu Natal
4,5,Limpopo\r\n,Limpopo
5,6,Mpumalanga\r\n,Mpumalanga
6,7,North West\r\n,North West
7,8,Northern Cape\r\n,Northern Cape
8,9,Western Cape,Western Cape
9,10,Not Specified,Not Specified


In [19]:
# Merge the sql_df with province_df to match the identified user province
# to their respective province_id from the database.
sql_df = sql_df.merge(province_df)

In [20]:
sql_df.head()

Unnamed: 0,tweets,date,city,Province,province_id,province
0,@koko_matshela @KhandaniM @EFFSouthAfrica @Esk...,2020-03-11 11:34:58,Gauteng,Not Specified,10,Not Specified
1,"@Eskom_SA Nevermind your systems, you have a g...",2020-03-11 11:32:58,Around.,Not Specified,10,Not Specified
2,"@Eskom_SA Dear Eskom,\nWhen this will finish, ...",2020-03-11 11:31:18,,Not Specified,10,Not Specified
3,@JrTebalelo @CityofCT @CityofCTAlerts @Eskom_S...,2020-03-11 11:29:16,South Africa,Not Specified,10,Not Specified
4,@koko_matshela @EFFSouthAfrica @Eskom_SA ?? ??...,2020-03-11 11:28:55,,Not Specified,10,Not Specified


In [21]:
sql_df.drop(['city', 'Province', 'province'], axis=1, inplace=True)

In [22]:
# Order tweets from latest to oldest in accordance with the timestamp
sql_df.sort_values(by='date', inplace=True)

In [23]:
sql_df.head()

Unnamed: 0,tweets,date,province_id
25,@autrenuit @Eskom_SA @News24 @TimesLIVE @eNCA ...,2020-03-11 11:06:04,10
37,@Eskom_SA you make things more difficult for ...,2020-03-11 11:06:39,1
24,Hhayibo @Eskom_SA vele almost 5 hours without ...,2020-03-11 11:06:40,10
23,@Eskom_SA @News24 @TimesLIVE @eNCA @IOL @SABCN...,2020-03-11 11:08:27,10
22,@LisemaM1 @TerranceDJacob1 @koko_matshela @EFF...,2020-03-11 11:08:33,10


# Removing hashtags and the municipalities

Write a function which:
- Uses the function you wrote in the Analyse section to extract the hashtags and municipalities into it's own column in a new data frame. 

Function Specifications:
- The function should take in the pandas dataframe you created in Function 1 and return a new pandas dataframe. 

In [24]:
def extract_municipality_hashtags(df):
    
    """Returns a modified dataframe with two new columns appended, "municipality" and "hashtags". Information is extracted from
    twitter data that includes the municipality and the list of hashtags referred to in each tweet, respectively.
    Input must contain a column named "Tweets".
    
    Parameters
    ----------
    df: dataframe of tweets 
    
    Returns
    -------
    df_new: modified dataframe 
    """
    
    mun_dict = {'@CityofCTAlerts' : 'Cape Town',
                '@CityPowerJhb' : 'Johannesburg',
                '@eThekwiniM' : 'eThekwini' ,
                '@EMMInfo' : 'Ekurhuleni',
                '@centlecutility' : 'Mangaung',
                '@NMBmunicipality' : 'Nelson Mandela Bay',
                '@CityTshwane' : 'Tshwane'}
    
    if type(df) == type(pd.DataFrame()):
        municipality = []
        data = df
        for i in data["tweets"]:
            data_str = i.replace(":", "") # Remove ":" from the end of municipality keys and hashtags
            data_str = str.split(data_str) # Splits a sentence/multi-word string by white space into a list
            data_muni = [a for a in data_str if a[0] == "@"] # Add words containing the hashtag to new list
            municipality = municipality + [data_muni]
        for j in range(len(municipality)):
            municipality[j] = [i.replace(i, mun_dict[i]) for i in municipality[j] if i in mun_dict]
        for x in range(len(municipality)):
            if municipality[x] == []:
                municipality[x] = (np.nan)

        df_muni = pd.DataFrame({"municipality": municipality})
        df = df.join(df_muni)
    
        data_subset = df
        hashtags = []
        for j, k in data_subset.iterrows(): # Iterate over pd df
            data_subset_str = data_subset.iloc[j,0]
            data_subset_str = str.split(data_subset_str) # Splits a sentence/multi-word string by white space into a list
            data_subset_hashtags = [a for a in data_subset_str if a[0] == "#"] # Add words containing the hashtag to new list
            data_subset_hashtags = list(map(lambda b: str.lower(b), data_subset_hashtags)) # Convert all hashtags in list to lower case
            if data_subset_hashtags == []:
                data_subset_hashtags = (np.nan) # Use () instead of [], resulting nan must not have square brackets in solution
            hashtags = hashtags + [data_subset_hashtags]

        df = data_subset
        df2 = pd.DataFrame({"hashtags": hashtags})
        df = df.join(df2)
        df_new = df
    
    else:
        print("Error: input must be a data frame.")
    return df_new

In [25]:
processed_tweets_df = extract_municipality_hashtags(sql_df)

In [26]:
processed_tweets_df.head()

Unnamed: 0,tweets,date,province_id,municipality,hashtags
25,@autrenuit @Eskom_SA @News24 @TimesLIVE @eNCA ...,2020-03-11 11:06:04,10,,
37,@Eskom_SA you make things more difficult for ...,2020-03-11 11:06:39,1,,
24,Hhayibo @Eskom_SA vele almost 5 hours without ...,2020-03-11 11:06:40,10,,
23,@Eskom_SA @News24 @TimesLIVE @eNCA @IOL @SABCN...,2020-03-11 11:08:27,10,,
22,@LisemaM1 @TerranceDJacob1 @koko_matshela @EFF...,2020-03-11 11:08:33,10,,


# Updating SQL Database with pyODBC

The function which connects and updates the SQL database processing processed tweets. 

Function Specifications:
- The function should take in a pandas dataframe created in Function 2. 
- Connect to your SQL database.
- Update the table you store your tweets in.
- Not return any output.

In [28]:
def pyodbc_twitter_processed(connection, df, twitter_table):
    """Extracts a dataframe of processed tweets and connects and updates the data in your local SQL database

    Parameters:
    -----------
    connection: SQL connection settings

    df: DataFrame of tweets with their timestamp

    twitter_table: An already existing twitter SQL database 

    Examples:
    --------
    >>> conn = pyodbc.connect(driver='{SQL Server}',
                      host='your_server_name',
                      database='your_database_name', 
                      trusted_connection='tcon',
                      user='your_user_name',
                      autocommit=True)
    
    >>> df = pd.read_sql_query('SELECT * FROM twitter_table', conn)
    >>> df
    
    tweet                          |               date | province_id | municipality |     hashtags |
    -------------------------------|--------------------|-------------|--------------|--------------|
    Some tweet @CityPowerJhb       |2020-03-11 11:37:49 |           1 | Johannesburg |           nan|
    tweet about #loadshedding      |2020-03-11 12:11:45 |           9 |          nan | #loadshedding|
    
    The following command will update the twitter_table in the SQL Server database:
    >>> pyodbc_twitter(connection, df, twitter_table) 
    """
    cursor = connection.cursor()
    
    # Alter tables to accept the data transferred from Jupyter Notebook
    cursor.execute(
            f"""
            ALTER TABLE {twitter_table}
            ALTER COLUMN date VARCHAR(300)
            """
        )
    
    cursor.execute(
            f"""
            ALTER TABLE {twitter_table}
            ALTER COLUMN province_id VARCHAR(10)
            """
        )
    
    for i in range(len(df.index)):
        
        tweet_text = df['tweets'][i]
        tweet_date = df['date'][i]
        tweet_province = df['province_id'][i]
        tweet_municipality = df['municipality'][i]
        tweet_hashtags = df['hashtags'][i] 
        
        # Removing all ' from within tweet text and hashtags
        tweet_text = str(tweet_text).replace("'","")
        tweet_municipality = str(tweet_municipality).replace("'","")
        tweet_hashtags = str(tweet_hashtags).replace("'","")
        
        cursor.execute(
            f"""
            INSERT INTO {twitter_table}
            VALUES ('{tweet_text}','{tweet_date}','{tweet_province}','{tweet_municipality}','{tweet_hashtags}')
            """
        )
        
        # Alter SQL table back to its normal form of having an INT and DATETIME column
        cursor.execute(
            f"""
            ALTER TABLE {twitter_table}
            ALTER COLUMN date DATETIME
            """
        )
    
        cursor.execute(
            f"""
            ALTER TABLE {twitter_table}
            ALTER COLUMN province_id INT
            """
        )
        
        
    conn.commit()
    return None 

In [29]:
pyodbc_twitter_processed(conn, processed_tweets_df, 'eskom_tweets_new')

In [30]:
conn.close()