# import packages

In [46]:
import tweepy
import yaml
import json
import sqlite3
from datetime import datetime
import pandas as pd

# Read twitter Authentication Keys

In [47]:
# yaml file reader funtion
def read_yaml(file_path):
    with open(file_path, "r") as f:
        return yaml.safe_load(f)

# yaml config file path
file_path = "twitter_api_key_config.yaml"
# read from config file
api_credential = read_yaml(file_path)

# Create Twitter Authentication

In [48]:
# API authentication
auth = tweepy.OAuthHandler(api_credential["api_key"], \
                           api_credential["api_secret_token"])
auth.set_access_token(api_credential["access_token"], \
                      api_credential["access_token_secret"])
api = tweepy.API(auth, wait_on_rate_limit=True)

# Create the Database and Required Tables

In [49]:
# establish a database connection
conn = sqlite3.connect('tweet_example.db')
cur = conn.cursor()
create_tweet_info_table = """CREATE TABLE tweet_info(tweet_id BIGINT PRIMARY KEY, \
                                          user_id BIGINT, \
                                          tweet_lang TEXT, \
                                          tweet_time TEXT, \
                                          source TEXT, \
                                          tweet_text TEXT,\
                                          quote_count TEXT, \
                                          reply_count INT, \
                                          retweet_count INT,\
                                          tweet_favorite_count INT, \
                                          hashtags TEXT, \
                                          short_urls TEXT, \
                                          expanded_urls TEXT, \
                                          user_mentions TEXT);"""

create_user_info_table =  """CREATE TABLE user_info(user_id BIGINT PRIMARY KEY, \
                                          user_screen_name TEXT, \
                                          user_name TEXT, \
                                          user_language TEXT, \
                                          location TEXT, \
                                          profile_url TEXT, \
                                          description TEXT, \
                                          protected TEXT, \
                                          verified TEXT, \
                                          created_at TEXT, \
                                          friends_count BIGINT, \
                                          followers_count BIGINT,\
                                          favorites_count BIGINT, \
                                          statuses_count BIGINT);"""

cur.execute(create_tweet_info_table)
cur.execute(create_user_info_table)
conn.close()

# check if a table exists in the database

In [50]:
conn = sqlite3.connect('tweet_example.db')
cur = conn.cursor()

table_exist_query = ''' SELECT count(*) FROM sqlite_master WHERE type='table' AND name='tweet_info' '''
cur.execute(table_exist_query)
exist_result = cur.fetchone()

if exist_result[0]==1:
    print("tweet_info table exists.")
else:
    print("tweet_info table does not exist.")
conn.close()

tweet_info table exists.


# define a StreamListener Object

In [51]:
# override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
    def __init__(self, listen_time=60):
        super(MyStreamListener, self).__init__()
        self.counter = 0
        print("Initialized Tweepy StreamListener.")
        self.start_time = datetime.now()
        self.current_time = datetime.now()
        self.listen_time = listen_time
        self.unique_user_id_set = set([])
        # adding database connection code
        self.conn = sqlite3.connect('tweet_example.db')
        self.cur  = self.conn.cursor()
        
    def insert_data(self, data):
        tweet_object=json.loads(data) # convert "string-line" into json
        # check if json object has a key id. Otherwise continue to next.
        if 'id' in tweet_object.keys(): 
            
            # tweet object information
            tweet_id        = tweet_object['id']
            user_id         = tweet_object['user']['id']
            tweet_lang      = tweet_object['lang']
            tweet_time      = str(pd.to_datetime(tweet_object['created_at']))
            source          = tweet_object['source']
            tweet_text      = tweet_object['text']

            # tweet numeric information
            quote_count = tweet_object['quote_count']
            reply_count = tweet_object['reply_count']
            retweet_count = tweet_object['retweet_count']
            tweet_favorite_count = tweet_object['favorite_count']

            # meta-content information
            hashtags = [str(hashtag['text']) for hashtag in tweet_object['entities']['hashtags']]
            hashtags = ",".join(hashtags)
            short_urls = [str(url['url']) for url in tweet_object['entities']['urls']]
            short_urls = ",".join(short_urls)
            expanded_urls = []
            try:
                expanded_urls = [str(url['expanded_url']) for url in tweet_object['entities']['urls']]
            except:
                print('Error Message: No Expanded URL.')
            expanded_urls = ",".join(expanded_urls)

            # user interaction based informations    
            user_mentions = [str(user_mentions['id'])\
                        for user_mentions in tweet_object['entities']['user_mentions']]
            user_mentions = ",".join(user_mentions)

            tweet_info = (tweet_id, user_id, tweet_lang,\
                    tweet_time, source, tweet_text,\
                    quote_count, reply_count, retweet_count,\
                    tweet_favorite_count, hashtags, short_urls,\
                    expanded_urls, user_mentions)
            self.cur.execute("INSERT INTO tweet_info \
            VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", tweet_info)
            
            # user profile information

            if user_id in self.unique_user_id_set:
                pass
            else:
                self.unique_user_id_set.add(user_id)
                
                user_screen_name      = tweet_object['user']['screen_name']
                user_name             = tweet_object['user']['name']
                user_language         = tweet_object['user']['lang']  
                location              = tweet_object['user']['location']
                profile_url           = tweet_object['user']['url']
                description           = tweet_object['user']['description']
                protected             = tweet_object['user']['protected']
                verified              = tweet_object['user']['verified']
                created_at            = str(pd.to_datetime(tweet_object['user']['created_at']))
                friends_count         = tweet_object['user']['friends_count']
                followers_count       = tweet_object['user']['followers_count']
                favorites_count       = tweet_object['user']['favourites_count']
                statuses_count        = tweet_object['user']['statuses_count']

                user_information = (user_id, user_screen_name, user_name,\
                       user_language, location, profile_url,\
                       description, protected, verified, created_at,\
                       friends_count, followers_count,\
                       favorites_count, statuses_count)
                self.cur.execute(" INSERT INTO user_info VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", user_information)
            
            self.conn.commit()
        
    def on_data(self, data):
        self.current_time = datetime.now()
        time_elapsed = (self.current_time - self.start_time).total_seconds()
        if time_elapsed < self.listen_time:
            try:
                self.counter += 1
                """
                Changing the code here.
                Previously, we saved to Text file.
                Now, we will pass this to the Database insertor method.
                """
                # -- self.output_file.write(str(data))
                print(f"Tweet Processed: {self.counter}\n")
                self.insert_data(str(data))
                
            except Exception as e:
                print(f"On data Exception:{e}.")
        else:
            print(f"Stream listen time period ended. Total listen time: {self.listen_time} seconds.\n\n")
            print(f"Total Tweet processed: {self.counter}")
            self.conn.close()
            return False

    # handling Errors
    def on_error(self, status_code):
        print(f"status_code: {status_code}")
        if status_code == 420:
            #returning False in on_error disconnects the stream
            return False

# create a stream

In [52]:
myStreamListener = MyStreamListener(listen_time=30)
myStream = tweepy.Stream(api.auth, myStreamListener)

Initialized Tweepy StreamListener.


# start the streamer

In [53]:
keywords = ['ida']
try:
    print("Stream Filter")
    myStream.filter(track=keywords)
    print("DONE")
except Exception as e:
    print(f"error in stream filter {e}")

Stream Filter
Tweet Processed: 1

Tweet Processed: 2

Tweet Processed: 3

Tweet Processed: 4

Tweet Processed: 5

Tweet Processed: 6

Tweet Processed: 7

Tweet Processed: 8

Tweet Processed: 9

Tweet Processed: 10

Tweet Processed: 11

Tweet Processed: 12

Tweet Processed: 13

Tweet Processed: 14

Tweet Processed: 15

Tweet Processed: 16

Tweet Processed: 17

Tweet Processed: 18

Tweet Processed: 19

Stream listen time period ended. Total listen time: 30 seconds.


Total Tweet processed: 19
DONE


# fetch a single entry from the database

In [54]:
# establish database connection
conn = sqlite3.connect('tweet_example.db')
# get the cursor object
cur  = conn.cursor()

# execute the query 
cur.execute("SELECT * FROM tweet_info")
# fetch result
tweet_info_single_result = cur.fetchone()
print(tweet_info_single_result)

print("\n\n")
# execute thre query
cur.execute("SELECT * FROM user_info")
# # fetchr result
user_info_single_result = cur.fetchone()
print(user_info_single_result)

(1435987801779101697, 104854735, 'en', '2021-09-09 15:25:45+00:00', '<a href="https://space.sprinklr.com" rel="nofollow">Sprinklr - Entergy</a>', 'As of today, Orleans Parish is 98% restored after 10 days of restoration work in the wake of #Ida. 682,000 of the 9… https://t.co/YxK0BhrC4e', '0', 0, 0, 0, 'Ida', 'https://t.co/YxK0BhrC4e', 'https://twitter.com/i/web/status/1435987801779101697', '')



(104854735, 'EntergyNOLA', 'Entergy New Orleans', None, 'New Orleans', 'http://www.entergyneworleans.com', 'This page is monitored during business hours. For customer service, please call 1-800-368-3749. To report power outages & emergencies, 1-800-968-8243.', '0', '1', '2010-01-14 16:16:27+00:00', 155, 20093, 487, 6856)


# fetch all the entries from the database

In [55]:
cur.execute("SELECT * FROM tweet_info")
tweet_info_all_result = cur.fetchall()
print(f"No of tweet stored: {len(tweet_info_all_result)}\n")
print(tweet_info_all_result)

print("\n\n")

cur.execute("SELECT * FROM user_info")
user_info_all_result = cur.fetchall()
print(f"No of user info stored: {len(user_info_all_result)}\n")
print(user_info_all_result)

No of tweet stored: 19

[(1435987801779101697, 104854735, 'en', '2021-09-09 15:25:45+00:00', '<a href="https://space.sprinklr.com" rel="nofollow">Sprinklr - Entergy</a>', 'As of today, Orleans Parish is 98% restored after 10 days of restoration work in the wake of #Ida. 682,000 of the 9… https://t.co/YxK0BhrC4e', '0', 0, 0, 0, 'Ida', 'https://t.co/YxK0BhrC4e', 'https://twitter.com/i/web/status/1435987801779101697', ''), (1435987802462818312, 533604777, 'pt', '2021-09-09 15:25:46+00:00', '<a href="http://twitter.com/download/android" rel="nofollow">Twitter for Android</a>', 'RT @Flamengo: Amanhã tem Mengão na semifinal do Carioca Sub-20 (jogo de ida)! \n\nBotafogo x Flamengo, no Cefat! \n\nA FlaTV transmite ao vivo…', '0', 0, 0, 0, '', '', '', '59591856'), (1435987805730217993, 1548952142, 'en', '2021-09-09 15:25:46+00:00', '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', 'Could use a little extra after Hurricane Ida Amanda-Lindsley', '0', 0, 0, 0, '

# how to get the column names of the returned database table
We can access the column names through the cursor description

In [56]:
user_info_column_names = [description[0] for description in cur.description]
print(f"user_info_column_names: {user_info_column_names}")

user_info_column_names: ['user_id', 'user_screen_name', 'user_name', 'user_language', 'location', 'profile_url', 'description', 'protected', 'verified', 'created_at', 'friends_count', 'followers_count', 'favorites_count', 'statuses_count']


# fetch the data into a pandas dataframe

In [58]:
cur.execute("SELECT * FROM tweet_info")
tweet_info_all_result = cur.fetchall()
tweet_info_column_names = [description[0] for description in cur.description]
tweet_info_dataframe = pd.DataFrame(tweet_info_all_result, \
                                   columns=tweet_info_column_names)
print(f"No of tweet stored: {len(tweet_info_all_result)}, {tweet_info_dataframe.shape[0]}\n")



cur.execute("SELECT * FROM user_info")
user_info_all_result = cur.fetchall()
user_info_column_names = [description[0] for description in cur.description]
user_info_dataframe = pd.DataFrame(user_info_all_result, \
                                   columns=user_info_column_names)
print(f"No of user info stored: {len(user_info_all_result)}, {user_info_dataframe.shape[0]}\n")

No of tweet stored: 19, 19

No of user info stored: 19, 19



In [None]:
tweet_info_dataframe.head()

In [None]:
user_info_dataframe.head()

# fetch data for a few selected columns from the database 

In [60]:
# execute the query
cur.execute("SELECT user_id, user_screen_name, \
                    friends_count, followers_count,\
                    favorites_count, statuses_count \
                    FROM user_info")
# fetch the result
user_info_selected_result = cur.fetchall()
# fetch the column names
user_info_selected_column_names = [description[0] for description in cur.description]
print(user_info_selected_column_names)
# create a dataframe from the fetched dataset
user_info_selected_dataframe = pd.DataFrame(user_info_selected_result, \
                                   columns=user_info_selected_column_names)

['user_id', 'user_screen_name', 'friends_count', 'followers_count', 'favorites_count', 'statuses_count']


In [61]:
# show some results from the dataframe
user_info_selected_dataframe.head()

Unnamed: 0,user_id,user_screen_name,friends_count,followers_count,favorites_count,statuses_count
0,104854735,EntergyNOLA,155,20093,487,6856
1,533604777,vgsa_,1717,1294,210,883120
2,1548952142,Amanda_Show47,657,217,8347,13887
3,311804569,NOLA_EM,200,983,191,1078
4,1337899476703383557,Chilly_CFC,128,33,1470,963


# fetch filtered entries from the database

In [None]:
# we are fetching users who have a follower count > 10
cur.execute("SELECT user_id, followers_count, \
                    WHERE followers_count>10 \
                    FROM user_info")
user_info_filtered_result = cur.fetchall()
print(f"Returned entries: {len(user_info_filtered_result)}")
user_info_filtered_column_names = [description[0] for description in cur.description]
user_info_filtered_dataframe = pd.DataFrame(user_info_filtered_result, \
                                   columns=user_info_filtered_column_names)
print(f"# rows user_info_filtered_dataframe: {user_info_filtered_dataframe.shape[0]}")

In [None]:
user_info_filtered_dataframe.head()

# close the database connection

In [52]:
conn.close()