In [1]:
import requests
import os
import json
import datetime
import threading
import time
import glob
import os

import pandas as pd

from functools import reduce
from collections import deque
from pprint import pprint

In [2]:
class TwitterLookupForResearch:

    def __init__(self, in_last_use):
        self.read_tokens()
        #
        # Follower Lookup RateLimit : 15 lookups per 15 minutes
        self.followers_lookups = deque(maxlen=15)
        self.followers_lookups.extend((datetime.datetime.now() - in_last_use for i in range(15)))
        #
        # User Lookup RateLimit : 300 lookups per 15 minutes
        self.user_lookups = deque(maxlen=300)
        self.user_lookups.extend((datetime.datetime.now() - in_last_use for i in range(300)))
        #
        # Users lookup rate limit: 300 lookups per 15 minutes
        self.users_lookups = deque(maxlen=300)
        self.users_lookups.extend((datetime.datetime.now() - in_last_use for i in range(300)))
        
    # -------------------- internal AUTHENTICATION ------------------------#
    def read_tokens(self):
        # Get bearer token
        with open('../SETTINGS/secrets.json') as f:
            secrets = json.load(f)
        self.bearer_token = secrets['BEARERTOKEN']
        
    def bearer_oauth(self, r):
        """
        Method required by bearer token authentication.
        """

        r.headers["Authorization"] = f"Bearer {self.bearer_token}"
        r.headers["User-Agent"] = "v2FollowersLookupPython"
        return r

    # -------------------- internal CONNECT ENDPOINT ------------------------#
    def connect_to_endpoint(self, url, params):
        response = requests.request("GET", url, auth=self.bearer_oauth, params=params)
        print(response.status_code)
        if response.status_code != 200:
            #print(response)
            #print(response.json())
            raise Exception(
                "Request returned an error: {} {}".format(
                    response.status_code, response.text
                )
            )
        return response.json()
    
     # -------------------- internal WAIT FOR ENDPOINT TIME ------------------------#
    def wait_rate_limit(self, in_per_minutes, in_timestamp_list):
        per_seconds = in_per_minutes * 60 + 1
        current_time = datetime.datetime.now()
        elapsed_time = current_time - in_timestamp_list[0]
        if elapsed_time >= datetime.timedelta(seconds=per_seconds):
            in_timestamp_list.append(current_time)
        else:
            print(f"Waiting for {per_seconds - elapsed_time.seconds}")
            time.sleep(per_seconds - elapsed_time.seconds)
            current_time = datetime.datetime.now()
            in_timestamp_list.append(current_time)
    
    # -------------------- inernal CREATE URLS ------------------------#
    def create_url_follower_lookup(self, in_user_id):
        return "https://api.twitter.com/2/users/{}/followers".format(in_user_id)

    def create_url_get_user_by_name(self, in_username):
        return f"https://api.twitter.com/2/users/by/username/{in_username}"
    
    def create_url_get_users_by_ids(self):
        return f"https://api.twitter.com/2/users"

    # -------------------- API ------------------------#
    def get_user_id_by_name(self, in_username):
        url = self.create_url_get_user_by_name(in_username)
        params = {"user.fields": "public_metrics,verified,withheld,created_at,protected,url,location"}
        self.wait_rate_limit(15, self.user_lookups)
        json_response = self.connect_to_endpoint(url, params)
        return json_response
    
    def get_users_by_ids(self, in_user_id_list):
        chunk_size = 100
        data_length = len(in_user_id_list)
        data_chunks = (in_user_id_list[i:i+chunk_size] for i in range(0, data_length, chunk_size))
        results = []
        for chunk in data_chunks:
            comm_sprtd_ids = ",".join((str(e) for e in chunk))
            url = self.create_url_get_users_by_ids()
            params = {"user.fields": "public_metrics,verified,withheld,created_at,protected,url,location",
                      "ids":comma_sprtd_ids}
            self.wait_rate_limit(15, self.users_lookups)
            json_response = self.connect_to_endpoint(url, params)
            results.append(json_response['data'])
        results = reduce(lambda x,y: x+y, results)
        return results

    def get_all_followers(self, in_user_id):
        print(f"Followers of :{in_user_id}")
        url = self.create_url_follower_lookup(in_user_id)
        current_params = {"user.fields": "public_metrics,verified,withheld,created_at,protected,url,location",
                          "max_results":1000}
        self.wait_rate_limit(15, self.followers_lookups)
        json_response = self.connect_to_endpoint(url, current_params)
        results = [json_response['data']]
        while "next_token" in json_response["meta"]:
            current_params["pagination_token"] = json_response["meta"]["next_token"]
            self.wait_rate_limit(15, self.followers_lookups)
            json_response = self.connect_to_endpoint(url, current_params)
            results.append(json_response['data'])
        results = reduce(lambda x,y: x+y, results)
        return results

In [3]:
MAIN_COLUMNS = ['created_at', 'id', 'location', 'name', 'protected', 'url', 'username', 'verified']

PUBLIC_METRIC_COLUMNS = ['followers_count', 'following_count', 'tweet_count', 'listed_count']

class BatchLookup:
    def save_users_to_df(in_user_list):
        data = []
        for user_basic_data in in_user_list:
            #print(user_basic_data)
            cols1 = [(user_basic_data[c] if c in user_basic_data else '') for c in MAIN_COLUMNS]
            cols2 = [(user_basic_data['public_metrics'][c]) for c in PUBLIC_METRIC_COLUMNS]
            data.append(cols1 + cols2)
        return pd.DataFrame(data, columns = MAIN_COLUMNS + PUBLIC_METRIC_COLUMNS)


In [4]:
tlfr = TwitterLookupForResearch(datetime.timedelta(seconds=5))

In [None]:
# User list data from Twitter/Brandwatch
df = pd.read_csv( glob.glob('C:\STUFF\RESEARCH\TENet\DATA\Tweets\*')[0], skiprows=6 )
df

In [None]:
users_list = list(df['Twitter Author ID'].unique())
users_list.sort()
users_list

In [None]:
all_users_df = pd.DataFrame([], columns = MAIN_COLUMNS + PUBLIC_METRIC_COLUMNS)

In [None]:
user_chunk_size = 10
user_chunks = ([users_list[i:i+user_chunk_size] for i in range(len(users_list))])
user_chunks

In [None]:
i = 0
for chunk_idx, this_user_chunk in enumerate(user_chunks):
    print(f"Chunk {chunk_idx} Started.")
    chunk_user_df_list = []
    for user_id in this_user_chunk:
        print(f"Looking up : {user_id}. {i} of {len(users_list)} users done.")
        if os.path.exists("C:/STUFF/RESEARCH/TENet/DATA/Followers/" + f"followers_{user_id}.csv"):
            print("Already exists.")
            continue
        followers = tlfr.get_all_followers(user_id)
        followers_df = BatchLookup.save_users_to_df(followers)
        chunk_user_df_list.append(followers_df)
        followers_df['id'].to_csv("C:/STUFF/RESEARCH/TENet/DATA/Followers/" + f"followers_{user_id}.csv",index=False)
        i += 1
    if len(chunk_user_df_list) > 0:
        chunk_users_df = pd.concat(chunk_user_df_list).drop_duplicates()
        chunk_users_df.to_csv("C:/STUFF/RESEARCH/TENet/DATA/Followers/" + f"chunk_{chunk_idx}_u{i-user_chunk_size}_u{i}.csv", index=False)
        all_users_df.append(chunk_users_df)
print("--ALL DONE--")

In [None]:
all_users_df.drop_duplicates().to_csv("C:/STUFF/RESEARCH/TENet/DATA/Followers/FollowerProfiles2.csv", index=False)

In [None]:
r = tlfr.get_user_id_by_name('cathura666')
r

In [None]:
BatchLookup.save_users_to_df(r)