In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


## **Library**
___

In [None]:
import json, os
from enum import Enum, auto
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re

## time library
import time
from time import perf_counter

## requests library
import requests
from requests_oauthlib import OAuth1
from requests.exceptions import HTTPError, ConnectionError
from requests import Timeout

## **Access Recent Tweets with Twitter API `v2`**
___


In [None]:
## Collection of twitter endpoints
TWITTER_ENDPT = {
    'recent_search' : "https://api.twitter.com/2/tweets/search/recent",
}

## Set up user-agent
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"

## Insert bearer token for Twitter API V2.0
BEARER_TOKEN = "<Your bearer token here>"

## Specify time window
TIME_WINDOW = 900

## Set query parameters here

### query parameters for Twitter API V2.0
QUERY_PARAMS = {
    # 'query' : '(Sony OR #Sony) (wf-1000xm4 OR ear buds) lang:en -is:retweet',
    'query' : '(Sony OR #Sony) (wf-1000xm4 OR ear buds)',
    'max_results' : 30,
    'tweet.fields' : 'author_id,public_metrics,created_at,attachments,context_annotations',
    'expansions' : 'author_id', 
}

## Specify max request in time window
MAX_REQ = 450

## Rate limit in time per request
RATE_LIMIT = TIME_WINDOW // MAX_REQ + 0.5

## Total data points requrred
N_DATAPTS = 100

## Maximum empty response limit
MAX_EMPTY = 2

## Maximum request exception limit
MAX_ERROR = 2


In [None]:
## set up authentication header for request
def bearer_oauth(r):
  r.headers["Authorization"] = f"Bearer {BEARER_TOKEN}"
  r.headers["User-Agent"] = USER_AGENT
  return r

## make single request to Twitter endpoint
def connect_to_endpoint(url, params, timeout=180, print_error=True):
  try:
    response = requests.get(url, auth=bearer_oauth, params=params, timeout=timeout)
    response.raise_for_status()
    return response.json(), True
  except HTTPError as http_err:
    if print_error:
      print(f"HTTP error occurred: {http_err}")
    return f"HTTP error occurred: {http_err}", False
  except ConnectionError as conn_err:
    if print_error:
      print(f"Connection error occurred: {conn_err}")
    return f"Connection error occurred: {conn_err}", False
  except Timeout as time_err:
    if print_error:
      print(f"Timeout error occurred after {timeout} seconds: {time_err}")
    return f"Timeout error occurred after {timeout} seconds: {time_err}", False
  except Exception as err:
    if print_error:
      print(f"Other error occurred: {err}")
    return f"Other error occurred: {err}", False

In [None]:
count = 0 # track total result count
max_error = 0 # track maximum failed requests from the last successful request
max_empty = 0 # track maximum empty response from the last non-empty response
newest_id = 0 # track the newest id at current response
oldest_id = 0 # track the oldest id at current response
i_count = 0 # track the count at current response
js_objs = []  # store all the response json objects
stats = []  # track connection statuses
i = 0 # track current iteration
now = 0 # track current time before request
old = 0 # track time at last request

print("Start digging tweets...")
while count <= N_DATAPTS:
  i += 1

  now = perf_counter()
  if (now - old) <= RATE_LIMIT:
    time.sleep(RATE_LIMIT - (now - old))
  resp, stat = connect_to_endpoint(TWITTER_ENDPT['recent_search'], QUERY_PARAMS)
  old = perf_counter()

  if stat:
    if "data" in resp:
      js_objs.append(resp)
      stats.append(f"{i}: Connection successful.")

      # update newest, oldest id, query parameters and count
      newest_id = resp['meta']['newest_id']
      oldest_id = resp['meta']['oldest_id']
      QUERY_PARAMS['until_id'] = oldest_id
      count += resp['meta']['result_count']

      # update other trackers
      max_error = 0
      max_empty = 0

      # continue with next iteration
      continue
    else:
      stats.append(f"{i}: Empty response returned.")
      max_empty += 1
      if max_empty >= MAX_EMPTY:
        break
      else:
        continue
  else:
    stats.append(f"{i}: {resp}")
    max_error += 1
    if max_error >= MAX_ERROR:
      break
    else:
      continue

# remove until_id key in the original query parameters
try:
  del QUERY_PARAMS['until_id']
except:
  pass

## serialize all data points 
print("Serializing data points...")
data_objs = [data for resp in js_objs for data in resp['data']]
print("Done.")

Start digging tweets...
Serializing data points...
Done.


In [None]:
stats

['1: Connection successful.',
 '2: Connection successful.',
 '3: Connection successful.',
 '4: Connection successful.']

In [None]:
## Destination file
ROOT_DIR = "/content/gdrive/MyDrive/Twitter"
DST_FILE = os.path.join(ROOT_DIR, 'test.json')

## save all data objects to dst file
with open(DST_FILE, 'w') as dst:
  json.dump(data_objs, dst)

In [None]:
## get texts from all data
texts = [data['text'] for data in data_objs]

In [None]:
## clean each text
def clean_tweet(tw):
  tw = re.sub(r'@[A-Za-z0-9]+', '', tw) # remove @mentions
  tw = re.sub(r'#', '', tw) # remove # in hashtags
  tw = re.sub(r'RT[\s]+', '', tw) # remove occasional RT in retweets
  tw = re.sub(r'https?:\/\/\S+', '', tw)  # remove the hyper link
  return tw

cleaned_tw = [clean_tweet(tw) for tw in texts]

In [None]:
cleaned_tw

[' こんにちは。ソニーサポートです。WF-1000XM4をご購入いただき、ありがとうございます。褒めてくださって嬉しいです✨\n基本操作や便利な機能を下記Webページでご紹介しておりますので、よろしければご活用ください🎵\n■使いこなしガイド→',
 ' こんにちは。ソニーサポートです。WF-1000XM4をご購入いただき、ありがとうございます。音楽をたくさん楽しんでください。\n基本操作や便利な機能を下記Webページでご紹介しておりますので、よろしければご活用ください🎵\n■使いこなしガイド→',
 ' sonyのwf-1000xm4最高だよ😚',
 '_2402 こんにちは。ソニーサポートです。WF-1000XM4をご購入いただき、ありがとうございます。ノイキャン性能を褒めていただきうれしいです。\n基本操作や便利な機能を下記Webページでご紹介しておりますので、よろしければご活用ください🎵\n■使いこなしガイド→',
 ': ワイヤレス イヤホン SONY ソニー WF-1000XM4 BM ブラック Bluetooth ノイズキャンセリング ノイキャン iPhone13 ipad mini Android switch 通話 マイク付き 防水 IPX4 ハイレゾ…',
 '_Aorus: 重要な音質に関しては正直WF-1000XM4は言われてるほど良いとは言えないと思う、ここに関してはGalaxy Buds Proが1つ2つ上を行っている。自分がAKGユーザーというのもあるが(とはいえスピーカーやヘッドホンはSONYも使用…',
 '"Les nouveaux Sony WF-1000XM4 sont d’ores et déjà victimes des promotions"  AnglohaTech Anglohasys',
 '_Aorus: 重要な音質に関しては正直WF-1000XM4は言われてるほど良いとは言えないと思う、ここに関してはGalaxy Buds Proが1つ2つ上を行っている。自分がAKGユーザーというのもあるが(とはいえスピーカーやヘッドホンはSONYも使用…',
 '_Ch_n: 100RTいったら、SONYのwf-1000xm4買う\n\nちなみに値段は定価3万超え。 ',
 '_Ch_n: 100RTいったら、SONYのw

## **Exploring Twitter API `v1.1` to get `full archive search` or `30-day search`**
___

> 30-day tweet search: 
  * 250 requests / month
  * 25K tweets / month

> Full archive search:
  * 50 requests / month
  * 5K tweets /month

> View [this page](https://developer.twitter.com/en/docs/twitter-api/premium/search-api/guides/operators) for the list of query operators. Most useful operators such as `is:retweet` is only available for premium paid subscribers.

In [None]:
## Specify authorization credentials (user key, secret, access token, access token secret)
AUTH_PARAMS = {
    'client_key' : '<Your user key here>',
    'client_secret' : '<Your user key secret here>',
    'resource_owner_key' : '<Your access token here>',
    'resource_owner_secret' : '<Your access token secret here>',
}


## Create OAuth1 authentication
auth = OAuth1(**AUTH_PARAMS)

## Specify development environment name
FULL_ENV = 'fulltest'
THIRTY_ENV = '30test'

## Define service endpoints
SERVICES = {
    'full_search' : f'https://api.twitter.com/1.1/tweets/search/fullarchive/{FULL_ENV}.json',
    '30_search' : f'https://api.twitter.com/1.1/tweets/search/30day/{THIRTY_ENV}.json'
}

## Total data returned from single request
DATA_PER_REQ = 100

## Define search parameters here for full archival search. 
# Start and end dates are necessary
FULL_QUERY_PARAMS = {
    'query' : "(Logitech OR #Logitech) (G915)",
    'maxResults' : str(DATA_PER_REQ),
    'fromDate' : '202101010000',
    'toDate' : '202110010000',
}

## Define search parameters here 30-day tweets search
# Start and end dates cannot be more than 30 days apart
THIRTY_QUERY_PARAMS = {
    'query' : "(Logitech OR #Logitech) (G915) review lang:en",
    'maxResults' : str(DATA_PER_REQ),
}

## Specify total data required
DATA_TOTAL = 200

## Specify maximum number of failed request to be tolerated
MAX_ERROR = 1

## Specify rate limit here (Request per second)
RPS = 10

In [None]:
## make single request to Twitter endpoint
def connect_to_endpoint(url, auth, params, timeout=180, print_error=True):
  try:
    response = requests.get(url, auth=auth, params=params, timeout=timeout)
    response.raise_for_status()
    return response.json(), True
  except HTTPError as http_err:
    if print_error:
      print(f"HTTP error occurred: {http_err}")
    return f"HTTP error occurred: {http_err}", False
  except ConnectionError as conn_err:
    if print_error:
      print(f"Connection error occurred: {conn_err}")
    return f"Connection error occurred: {conn_err}", False
  except Timeout as time_err:
    if print_error:
      print(f"Timeout error occurred after {timeout} seconds: {time_err}")
    return f"Timeout error occurred after {timeout} seconds: {time_err}", False
  except Exception as err:
    if print_error:
      print(f"Other error occurred: {err}")
    return f"Other error occurred: {err}", False

In [None]:
resp_objs = []  # store response from every request
status = [] # store status of every request
count = 0 # track number of data points collected
i = 0 # keep track of iteration
next_page = True  # keep track if next page exists
n_error = 0 # keep track of number of failed request from last successful request
start_time = 0  # keep track of the start time of every rate limit cycle
end_time = 0  # keep track of the end time of every rate limit cycle

print("Start digging tweets...")
while count < DATA_TOTAL and next_page:
  # record end time of every rate limit cycle
  if i%RPS == 0:
    end_time = perf_counter()
    if (end_time - start_time) < 1:
      time.sleep(1.5 - (end_time - start_time))

  i += 1
  # record start time of every rate limit cycle
  if i%RPS == 1:
    start_time = perf_counter()

  # make API request
  resp, stat = connect_to_endpoint(SERVICES['30_search'],
                                   auth=auth,
                                   params=THIRTY_QUERY_PARAMS)
  
  
    
  if stat:
    resp_objs.append(resp['results'])
    
    # indicate no next page available if 'next' is not in payload
    if 'next' not in resp:
      next_page = False
    else:
      THIRTY_QUERY_PARAMS['next'] = resp_objs['next']
    
    # update count
    count += len(resp['results'])

    # update n_error
    n_error = 0

    # update status
    status.append(f"{i}: Request successful.")

    # continue to next loop
    continue
  else:
    # update n_error
    n_error += 1

    # update status
    status.append(f"{i}: {resp}")
  
  if n_error > MAX_ERROR:
    break

# delete next key in query
try:
  del THIRTY_QUERY_PARAMS['next']
except:
  pass

# serialize the data objects
print("Serializing objects")
all_data = [obj for resp in resp_objs for obj in resp]
print("Done.")

Start digging tweets...
Serializing objects
Done.


In [None]:
# get all texts
all_texts = [obj['text'] for obj in all_data]

In [None]:
all_texts

['On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM',
 'On Blog of Dad:  Logitech G915 LIGHTSPEED With GL Tactile Switches Review https://t.co/Tfen7dfCtM']