### Twitter API Extraction Tool

This tool will extract tweets from Twitter API V2, it would require an Academic Research API Key/Bearer Token for this to be executed.

In [62]:
import requests
import os
import json
import time
import pandas as pd
import getpass
import sqlite3
from IPython.display import clear_output

bearer_token = '' # enter twitter API Bearer token

In [133]:
def bearer_oauth(r):
    """
    Accepts a json parameter and setups the bearer token for
    Twitter headers
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FullArchiveSearchPython"
    return r


def connect_to_endpoint(url, params):
    """
    Accepts a url string and parameters for the API Request and
    returns the response in json format
    """

    response = requests.get(url, auth=bearer_oauth, params=params)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()


def count_tweets(q_params):
    """
    Calls the Twitter V2 API for getting the counts of tweets
    and returns the expected tweets.
    """

    count_url = "https://api.twitter.com/2/tweets/counts/all"
    cnt_params = {'granularity': 'day', **q_params}
    del cnt_params['tweet.fields']
    del cnt_params['max_results']

    total_twts = 0
    while True:
        cnt_twts = connect_to_endpoint(count_url, cnt_params)
        total_twts += cnt_twts['meta']['total_tweet_count']
        try:
            cnt_params['next_token'] = cnt_twts['meta']['next_token']
        except KeyError as e:
            if e.args[0] == 'next_token':
                break
            else:
                raise e
    return total_twts


def get_all_tweets(username, start_date, end_date):
    """
    Calls the TWitter V2 API for searching all the tweets based
    on passed username's along with the start and end date. 

    Returns the list of tweets fetched.
    """

    search_url = "https://api.twitter.com/2/tweets/search/all"

    query_params = {
        'query': f"(from:{username}) -is:retweet",
        'start_time': start_date,
        'end_time': end_date,
        'tweet.fields': ('author_id,context_annotations,'
                         'conversation_id,created_at,'
                         'entities,id,in_reply_to_user_id,lang,'
                         'public_metrics,possibly_sensitive,'
                         'referenced_tweets,reply_settings,'
                         'source,text'),
        'max_results': 100
    }

    total_twts = count_tweets(query_params)
    print(f"GETTING {total_twts} tweets")

    tweets = []

    slp_counter = 0
    while True:
        if slp_counter >= 300:
            for i in range(15):
                print(f"Sleeping for {15-i} min")
                time.sleep(61.0)
            slp_counter = 0

        response = connect_to_endpoint(search_url, query_params)
        slp_counter += 1

        for tweet in response.get('data'):
            tweets.append(tweet)

        try:
            query_params['next_token'] = response['meta']['next_token']
        except KeyError as e:
            if e.args[0] == 'next_token':
                break
            else:
                raise e
        print(f"Processed {len(tweets)} out of {total_twts} tweets")

    print(f"PROCESSED {len(tweets)} tweets")

    return tweets


def get_username(usr_id):
    """
    Gets the username of the twitter user based on the user id and returns it
    """

    usr_response = connect_to_endpoint(
        f"https://api.twitter.com/2/users/{usr_id}", {})
    return usr_response['data']['username']


def scrape_twitter(username, start_date, end_date):
    """
    Processes tweets fetched for the Twitter API and builds the Data Frame
    for the record collection and return it,
    """

    tweets = get_all_tweets(username, start_date, end_date)
    df = pd.DataFrame(tweets)

    usr_ids = df['author_id'].unique()
    usr_lookups = {}
    for i, usr in enumerate(usr_ids):
        usr_lookups[usr] = get_username(usr)
        print(f"Compiled {i+1} out of {len(usr_ids)} user information")

    df['username'] = df['author_id'].apply(lambda x: usr_lookups[x])

    try:
        df['url'] = df[['username', 'tweet_id']].apply(lambda x:
                                                       f"twitter.com/{x.username}/status/{x.tweet_id}", axis=1)
    except:
        df['url'] = df[['username', 'id']].apply(lambda x:
                                                 f"twitter.com/{x.username}/status/{x.id}", axis=1)

    for i in ['context_annotations', 'public_metrics', 'entities',
              'edit_history_tweet_ids', 'referenced_tweets']:
        if i in df.columns:
            df[i] = df[i].astype('str')
        else:
            df[i] = ''
    print("DONE")
    return df


## Data Loader to SQL

In [136]:
def load_tweets(influencers):
    """
    Accepts a list of twitter user handles without @ i.e. cz_binance.
    Each twitter account would be scrape based on the filtered dates 
    bull and bear market dates assigned. 
    
    Fetched tweets will be saved into twitter_dump table.
    """
    
    for inf in influencers:
        clear_output(wait=True)
        print(f'Getting Tweets for {inf}')
        df1 = scrape_twitter(inf, '2021-10-01T00:00:00Z',
                             '2021-11-30T23:59:59Z')  
        df1.to_sql('twitter_dump', conn, if_exists='append', index=False)
        df2 = scrape_twitter(inf, '2022-10-01T00:00:00Z',
                             '2022-11-25T23:59:59Z') 
        df2.to_sql('twitter_dump', conn, if_exists='append', index=False)
    

In [137]:
influencers = ['cz_binance']
load_tweets(influencers)

Getting Tweets for cz_binance
GETTING 151 tweets
Processed 100 out of 151 tweets
PROCESSED 151 tweets
Compiled 1 out of 1 user information
DONE
GETTING 705 tweets
Processed 100 out of 705 tweets
Processed 200 out of 705 tweets
Processed 300 out of 705 tweets
Processed 400 out of 705 tweets
Processed 493 out of 705 tweets
Processed 593 out of 705 tweets
Processed 693 out of 705 tweets
PROCESSED 698 tweets
Compiled 1 out of 1 user information
DONE


In [None]:
df1.head()

In [129]:
pd.read_sql_query("SELECT count(*) FROM twitter_dump", conn)

Unnamed: 0,count(*)
0,39993


In [119]:
conn.execute("delete from twitter_dump where username=''")
conn.commit()