## TwitterAPIv2 Historical Call

- Etienne P Jacquot 10/12/2021

### Load your Modules

In [1]:
import json
import requests
import os
from time import sleep
import datetime
from datetime import datetime

### Set your access tokens

> This example notebook assumes APIv2 Academic Track, though of course you can run for `2/tweets/search/recent` on the Standard track

In [2]:
BEARER_TOKEN = 'AAAAAAAAAAAAAAAAAAAAAJ0zTAEA..........spw8C93yzWcoR' # <-- replace with your Bearer Token here!

### Set your query parameters
- the `point_radius:[]` search operator allows for filtering on confirmed geolocations, more info here: https://developer.twitter.com/en/docs/twitter-api/enterprise/search-api/overview

In [3]:
##########################################
# Set your query parameters
##########################################

query = '("covid vaccine" OR vaxx OR mrna OR "the vaccine" OR "covid19 vaccine") -($mrna OR az OR astrazeneca) (point_radius:[-73.98 40.74 25mi] OR point_radius:[-75.16 39.95 25mi] OR point_radius:[-77.03 38.90 25mi] OR point_radius:[-81.67 41.48 25mi] OR point_radius:[-83.10 42.35 25mi] OR point_radius:[-87.69 41.87 25mi] OR point_radius:[-84.39 33.74 25mi] OR point_radius:[-80.21 25.77 25mi] OR point_radius:[-90.08 29.94 25mi] OR point_radius:[-95.36 29.75 25mi] OR point_radius:[-118.25 34.03 25mi] OR point_radius:[-122.26 37.80 25mi])'

# Set start_time date for historical
start_time = "2021-08-12T00:00:00Z"
end_time = "2021-09-12T00:00:00Z"

# Add in expanded tweet fields for additional returned data
EXPANSIONS = "author_id,referenced_tweets.id,referenced_tweets.id.author_id,in_reply_to_user_id,attachments.media_keys,geo.place_id"
MEDIA_FIELDS = (
    "duration_ms,height,media_key,preview_image_url,type,url,width,public_metrics"
)
TWEET_FIELDS = "created_at,author_id,public_metrics,source"
USER_FIELDS = (
    "description,name,username,created_at,location,url,verified,public_metrics"
)

### Set your APIv2 Endpoint

- this example using `2/tweets/search/all` for Academic track

In [4]:
##########################################
# Set your Twitter APIv2 endpoint URL
# -----------------------------------
# Optional params: start_time,end_time,since_id,until_id,next_token,granularity
##########################################

search_url = "https://api.twitter.com/2/tweets/search/all"

query_params = {"query": {query},
                "start_time":start_time,
                "end_time":end_time,
                "expansions": EXPANSIONS, # < -- Comment out if your expansions are empty...
                "media.fields": MEDIA_FIELDS,
                "tweet.fields": TWEET_FIELDS,
                "user.fields": USER_FIELDS,       
               }

### 🚀 Run your APIv2 call for Historical Search

- This handles rate limit and pagination!


#### Prepare out directory & filename prefix

In [5]:
mkdir ./data

In [6]:
filename_prefix = './data/tweets_10122021'

### Rate Limiting on Academic Track



- Full-archive search **300**
    - Full-archive also has a **1 request / 1 second limit**.
    - More info here: https://developer.twitter.com/en/docs/twitter-api/rate-limits#v2-limits

In [7]:
def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
    r.headers["User-Agent"] = "v2FullArchiveSearchPython"
    return r

def sleep_on_rate_limit(response):
    print('oops! rate limit hit...')
    print(response.headers['x-rate-limit-reset'])
    remaining, limit, reset = None, None, None
    if 'x-rate-limit-remaining' in response.headers:
        remaining = int(response.headers['x-rate-limit-remaining'])
        #if remaining == 0:
        limit = int(response.headers['x-rate-limit-limit'])
        reset = int(response.headers['x-rate-limit-reset'])
        reset = datetime.fromtimestamp(reset)
    rate_limit_check = {'remaining': remaining, 'limit': limit, 'reset': reset}
    # Sleep for required amount of time
    print('RATE LIMIT - Waiting for x-rate-limit-reset time...')
    print(rate_limit_check['reset'])
    sleep((rate_limit_check['reset'] - datetime.now()).total_seconds())
    print('Okay! Waiting another 90 seconds to be safe...')
    sleep(90)
    
def connect_to_endpoint(url, params):
    response = requests.request("GET", search_url, auth=bearer_oauth, params=params)
    print(response)
    
    # TO HANDLE RATE LIMITING
    if response.status_code == 429:
        sleep_on_rate_limit(response)
        response = requests.request("GET", search_url, auth=bearer_oauth, params=params)
    
    # if the status code is not successful 200, these are the unhandled exceptions! 
    if response.status_code != 200:           
        raise Exception(response.status_code, response.text)
    return response.json()


def main():
    sleep(1.25)
    # get HTTP response
    json_response = connect_to_endpoint(search_url, query_params)
              
    # Create first outfile (pre-pagination)
    with open('{}_.json'.format(filename_prefix), 'w', encoding='utf8') as file:
            json.dump(json_response, file)
    #print(json.dumps(json_response, indent=4, sort_keys=True))
    
    # FOR PAGINATION LOOP
    while 'next_token' in json_response['meta']:
        sleep(1.25) # <-- full search you need to wait at least one second between calls!
        next_token = json_response['meta']['next_token']
        json_response = connect_to_endpoint(search_url, {"query": {query},
                                                        "start_time":start_time,
                                                        "end_time":end_time,
                                                        "expansions": EXPANSIONS, # < -- Comment out if your expansions are empty...
                                                        "media.fields": MEDIA_FIELDS,
                                                        "tweet.fields": TWEET_FIELDS,
                                                        "user.fields": USER_FIELDS, 
                                                        "next_token":next_token})
                                                         #'granularity':'day' # <-- for counts (Comment out for search)
        #print(json.dumps(json_response, indent=4, sort_keys=True))
        
        # Create all next outfiles (post-pagination)
        with open('{}_{}.json'.format(filename_prefix, next_token), 'w', encoding='utf8') as file:
            json.dump(json_response, file)

if __name__ == "__main__":
    main()

<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200]>
<Response [200

_________________________

## Amazing! You now have APIv2 results with successful pagination & rate limiting