In [1]:
from twikit import Client
from twikit import TwitterException 
from twikit import TooManyRequests
from twikit.utils import Endpoint
from twikit import BadRequest
from requests import ReadTimeout
from twikit import Unauthorized
from translate import Translator
from math import ceil
import time
import json
import requests
import random

In [9]:
# this API requires authentication
f = open('authentication.txt', 'r')
auth = f.read()
f.close()
auth_token = auth.split("\n")

# don't hardcode your email and password into something!!!
# the auth is in gitignore so I won't get hacked
username = str(auth_token[0])
email = str(auth_token[1])
password = str(auth_token[2])

# Initialize client
client = Client(language='en-US', http2=True)

In [3]:
def authentication(username, email, password):
    try:
        # Login to the service with provided user credentials
        client.login(
            auth_info_1=username ,
            auth_info_2=email,
            password=password)

        print("Login successful!")
        return True

    except BadRequest:
        print("Login unsuccessful. One or more login parameters is incorrect.")
        return False

In [4]:
authentication(username, email, password)

Login successful!


True

In [7]:
# Twitter LOVES to ban people when they log in repeatedly
# saving the cookies makes sure I don't get banned (often)

client.get_cookies()
client.save_cookies('IGNOREcookies.json')
with open('IGNOREcookies.json', 'r', encoding='UTF8') as f:
    client.set_cookies(json.load(f))

In [8]:
# housekeeping function
# each different method uses a different API endpoint
# each different API endpoint has a rate limit
# you can hit it a certain number of times per a time period (usually 15 minutes)
# this tells me how much time I have left if I've hit the rate limit

def get_limit_reset_time(endpoint: str):
    res = requests.get(
        endpoint,
        headers=client._base_headers,
        cookies=client.get_cookies()
    )
    return ceil(int(res.headers['x-rate-limit-reset']) - time.time())

In [None]:
def get_rate_limit_search_tweet():
    try:
        print(client.search_tweet(
            f'from:JoeBiden since:2020-01-01 until:2021-03-01', 'Latest', count=40
        ))
    except TooManyRequests:
        reset_time = get_limit_reset_time(Endpoint.USER_TWEETS)
        print(f'rate limit is reset after {reset_time} seconds.')

In [None]:
def get_rate_limit_tweet_by_id():
    try:
        print(client.get_tweet_by_id(1351951465674276869))
    except TooManyRequests:
        reset_time = get_limit_reset_time(Endpoint.USER_TWEETS)
        print(f'rate limit is reset after {reset_time} seconds.')

In [None]:
# another housekeeping function
# if I'm suddenly getting 403 errors, I can use this to check if I've been banned
# sometimes I just have to go on the browser and reauthenticate

def check_user_status(user_id):
    """
    True if the user is active, otherwise false (not exists or suspended).
    """
    try:
        client.get_user_by_id(user_id)
    except TwitterException as e:
        if str(e).startswith('Invalid user id'):
            return False
        raise e
    else:
        return True

check_user_status(1547081484695216130)

True

In [None]:
# INPUT: the user handle, a beginning and end of a date range
# OUTPUT: the user handle, the user_ID, and the scraped tweets
def get_all_tweets(handle, since, until):

    try:
        # load the cookies so you don't login a million times and get banned
        client.load_cookies('IGNOREcookies.json')

        # initialize the list we will store our data in
        mass_tweets = []

        since = f'{since}-01-01'
        until = f'{until}-01-31'

        # this will pull the first forty tweets
        tweets = client.search_tweet(
            f'from:{handle} since:{since} until:{until}', 'Top'
        )
        tweets1 = [tweet.id for tweet in tweets]
        mass_tweets += tweets1

        # this endpoint has a rate limit of 50 hits per 15 minutes
        # 15 min = 900 seconds
        # 900//50 = 18
        # allows the program to be automated
        time.sleep(18)
        
        # if it returns an empty list, the user had no available tweets during the date time range
        if len(tweets) == 0:
            return([])
        
        # this will keep looking for tweets until a certain number of them has been reached
        while len(tweets) > 0 and len(mass_tweets) < 30:

            # this API provides a 'tweet' object, but we only want the id when we return
            tweets = tweets.next()
            tweets1 = [tweet.id for tweet in tweets]
            mass_tweets += tweets1
            time.sleep(18) # cooldown

            # keep pulling tweets until number is hit or there are none left

            # we need to make a check in case we've hit the max number of tweets we can scrape
            # this prevents us from pinging the API for no reason
            if len(tweets) == 0:
                break
            else:
                continue
        
    except ReadTimeout:
        return(mass_tweets)

    return(mass_tweets)

In [None]:
def process_tweets(handle, user_id, name, tweet_ids, since):
    # load the cookies so you don't login a million times and get banned
    client.load_cookies('IGNOREcookies.json')

    # initialize a list to store all tuples
    tweets = []

    for tweet_id in tweet_ids:
        try:
            # using the IDs we pulled from above
            tweet = client.get_tweet_by_id(tweet_id)

            # we have international data
            # this will translate it and identify it's translation
            if tweet.lang != 'en':
                translator = Translator(to_lang='en')
                tweet_text = translator.translate(tweet.text)
                tweets.append([int(tweet.id), int(user_id), name, handle, str(tweet_text), str(tweet.lang), 'True', 'en', str(tweet.created_at_datetime), since])

            # otherwise we just move on
            else:
                tweets.append((int(tweet.id), int(user_id), name, handle, str(tweet.text), str(tweet.lang), 'False', 'null', str(tweet.created_at_datetime), since))

            # this endpoint can process 150 tweets per 15 minutes
            # 15 min = 900 seconds
            # 900//150 = 6
            # allows program to be fully automated
            time.sleep(6)

        # it throws an Index Error if the tweet has been deleted/ is not available
        except IndexError:
            print(f'Index Error: unable to process {tweet} from {name}')
        except ReadTimeout:
            print('Read timeout')
            return tweets
    return tweets

In [None]:
def pull_tweet_ids(user_list):
    tweet_ids_list = []
    no_tweets_list = []

    for idx, user in enumerate(user_list):

        try:
            # all of our parameters for the function
            user_id = user[0]
            name = user[1]
            handle = user[2]
            since = user[3]
            until = user[3] + 1

            print(name)

            # pull tweet IDs that we will user in second while loop
            tweet_ids = get_all_tweets(handle, since=since, until=until)
            print(f'{len(tweet_ids)} tweets collected for {name} for {since} election')

            # we only add to the counter if that person actually had tweets to process
            if len(tweet_ids) > 0:
                tweet_ids_list.append([handle, user_id, name, tweet_ids, since])
            
            # I want to keep track of which politicians didn't tweet during their election year
            else:
                no_tweets_list.append([user_id, name, handle, since])

        # this error happens when we try to hit the API too many times
        except TooManyRequests:
            print("Too many requests")
            print(get_rate_limit_search_tweet())
            time.sleep(900)
        
        # I honestly don't know why this error happens
        # I'm too speedy for the requests module I guess
        except ReadTimeout:
            print(""""The read operation timed out.
                      If authentication fails, you may be blocked or need to authenticate through a browser.""")
            if authentication(username, email, password):
                continue
            elif not check_user_status(1547081484695216130):
                print("Authentication failed. Function pull_tweet_ids terminating.")
                return idx, (tweet_ids_list, no_tweets_list)
            else:
                print("Unknown authentication issue. Function pull_tweet_ids terminating.")
                return idx, (tweet_ids_list, no_tweets_list)
        
        # elon musk caught my scent :(
        # reauthenticate in a browser
        except Unauthorized:
            if authentication(username, email, password):
                continue
            else:
                print("You need to reauthenticate through a browser.")
                return idx, (tweet_ids_list, no_tweets_list)

    print(f'This program was able to find tweets for {len(tweet_ids_list)} out of {len(user_list)} politicians.')
    print(f'{len(no_tweets_list)} politicians had no tweets during one or more of their campaign years.')
    return tweet_ids_list, no_tweets_list

In [None]:
def process_pulled_tweet_ids(tweet_ids_list, num_tweets):

    tweet_list = []

    for user in tweet_ids_list:
        try:

            # all the parameters we need
            handle = user[0]
            user_id = user[1]
            name = user[2]
            tweet_ids = user[3]
            year = user[4]
            
            # we only want to do up to 25 tweets per politician
            # this means we can process 6 politicians per rate timeout
            # 150 tweets per 15 minutes
            # not great but it's free

            if len(tweet_ids) > num_tweets:
                # random sample in an attempt to stay unbiased
                tweet_ids = random.sample(tweet_ids, num_tweets)
            
            # process the tweets and add them to our holding list from above
            tweets = process_tweets(handle, user_id, name, tweet_ids, year)
            tweet_list.append(tweets)
            print(f"{num_tweets} tweets processed for {name}")
        
        except TooManyRequests:
            print("Too many requests")
            print(get_rate_limit_tweet_by_id())
            time.sleep(900)

        except ReadTimeout:
            print(""""The read operation timed out.
                      If authentication fails, you may be blocked or need to authenticate through a browser.""")
            if authentication(username, email, password):
                continue
            elif not check_user_status(1547081484695216130):
                print("Authentication failed. Function pull_tweet_ids terminating.")
                return tweet_list
            else:
                print("Unknown authentication issue. Function pull_tweet_ids terminating.")
                return tweet_list

    return tweet_list

In [None]:
import sqlite3
conn = sqlite3.connect('tweets.db')
c = conn.cursor()

c.execute(""" SELECT twitter_user_id, politician_name, twitter_handle, election_year
            FROM coordinates
            WHERE twitter_active_during_election = 'True'
            """)
active_user_list = c.fetchall()
print(active_user_list[0])

(813286, 'Barack Obama', 'BarackObama', 2008)


In [None]:
user_list = active_user_list.copy()
print(len(user_list))

193


In [None]:
tweet_ids, no_tweets = pull_tweet_ids(user_list)

Barack Obama
39 tweets collected for Barack Obama for 2008 election
Joe Biden
0 tweets collected for Joe Biden for 2008 election
Dennis Kucinich
38 tweets collected for Dennis Kucinich for 2008 election
Mike Huckabee
39 tweets collected for Mike Huckabee for 2008 election
Fred Thompson
0 tweets collected for Fred Thompson for 2008 election
Chris Dodd
39 tweets collected for Chris Dodd for 2008 election
Jill Stein
23 tweets collected for Jill Stein for 2012 election
Barack Obama
39 tweets collected for Barack Obama for 2012 election
Mitt Romney
29 tweets collected for Mitt Romney for 2012 election
Virgil Goode
11 tweets collected for Virgil Goode for 2012 election
Gary Johnson
0 tweets collected for Gary Johnson for 2012 election
Newt Gingrich
39 tweets collected for Newt Gingrich for 2012 election
Ron Paul


In [None]:
print(len(tweet_ids))

In [None]:
tweet_list = process_pulled_tweet_ids(tweet_ids, 25)

In [None]:
import sqlite3
conn = sqlite3.connect('tweets.db')
c = conn.cursor()

<sqlite3.Cursor at 0x22f25006e40>

In [None]:
# upload data in SQLite table
# storing this data is important since this is an unofficial API
# every time I access it, I am risking not being able to access it again

for lst in tweet_list:
    for tweet in lst:
        c.execute("INSERT INTO politician_tweets VALUES (?,?,?,?,?,?,?,?,?,?)", tweet)
        conn.commit()
