In [1]:
import requests
import json
import time
import random
import os

import pandas as pd

In [2]:
# bearer_token = os.environ.get("BEARER_TOKEN")

In [3]:
bearer_token = "AAAAAAAAAAAAAAAAAAAAAEFb0wEAAAAA0aganoy5AFhrqxVm%2Ffx3Afb06d4%3DCGAT4xP56t0jVq0YOpnyavlkSGqKBsLX9l9glHldS0TsVXUKTG"

In [4]:
print(bearer_token)

AAAAAAAAAAAAAAAAAAAAAEFb0wEAAAAA0aganoy5AFhrqxVm%2Ffx3Afb06d4%3DCGAT4xP56t0jVq0YOpnyavlkSGqKBsLX9l9glHldS0TsVXUKTG


In [5]:
endpoint_url = "https://api.twitter.com/2/tweets/search/recent"

In [6]:
query_parameters = {
    "query": '("heat map" OR "heat pumps") lang:en -is:retweet',
    "tweet.fields": "id,text,author_id,created_at",
    "max_results":11
}
    

In [7]:
query_parameters

{'query': '("heat map" OR "heat pumps") lang:en -is:retweet',
 'tweet.fields': 'id,text,author_id,created_at',
 'max_results': 11}

In [8]:
def request_headers(bearer_token:str) -> dict:
    """
    Sets up the request headers.
    Returns a dictionary summarising the bearer token autentication details.
    """
    return {"Authorization":"Bearer {}".format(bearer_token)}

headers = request_headers(bearer_token)


In [9]:
def connect_to_endpoint(endpoint_url: str, headers: dict, parameters: dict) -> json:
    """
    Connects to the Twitter API endpoint and handles rate limiting (429) and server errors.
    Will raise an Exception for client-side errors other than 429.
    """
    response = requests.request("GET", url=endpoint_url, headers=headers, params=parameters)
    status_code = response.status_code

    if status_code == 200:
        return response.json()
    
    if status_code == 429:
        # Trop de requêtes – attendre longtemps
        wait_time = 900  # 15 minutes (en secondes)
        print(f"Rate limit exceeded. Sleeping for {wait_time // 60} minutes...")
        time.sleep(wait_time)
        return connect_to_endpoint(endpoint_url, headers, parameters)
    
    elif 400 <= status_code < 500:
        # Autres erreurs client (ex: mauvais token, URL incorrecte, etc.)
        raise Exception(
            f"Client error – program will stop!\nHTTP {status_code}: {response.text}"
        )
    
    else:
        # Erreurs serveur ou inconnues
        sleep_seconds = random.randint(2, 60)
        print(
            f"Server error. Sleeping for {sleep_seconds} seconds...\nHTTP {status_code}: {response.text}"
        )
        time.sleep(sleep_seconds)
        return connect_to_endpoint(endpoint_url, headers, parameters)


In [None]:
json_response = connect_to_endpoint(endpoint_url, headers, query_parameters)

Rate limit exceeded. Sleeping for 15 minutes...


In [None]:
type(json_response)

In [None]:
json_response.keys()

In [None]:
json_response['meta']

In [None]:
len(json_response['data'])

In [None]:
json_response['data'][0]

In [None]:
rules = [
    {"value": '("heat pump" OR "heat pumps") -is:retweet lang:en', "tag": "heat_pump"},
    {"value": '("gas boiler" OR "gas boilers") -is:retweet lang:en', "tag": "gas_boiler"},
]

In [None]:
query_parameters = {
    "tweet.fields": "id,text,author_id,created_at",
    "user.fields": "id,name,username,created_at,description,location,verified",
    "expansions": "author_id",
    "max_results": 100,
}

In [None]:
def process_twitter_data(
    json_response: json,
    query_tag: str,
    tweets_data: pd.DataFrame,
    users_data: pd.DataFrame,
) -> (pd.DataFrame, pd.DataFrame):
    """
    Adds new tweet/user information to the table of
    tweets/users and saves dataframes as pickle files,
    if data is avaiable.
    
    Returns the tweets and users updated dataframes.
    """
    if "data" in json_response.keys():
        new = pd.DataFrame(json_response["data"])
        tweets_data = pd.concat([tweets_data, new])
        tweets_data.to_pickle("tweets_" + query_tag + ".pkl")

        if "users" in json_response["includes"].keys():
            new = pd.DataFrame(json_response["includes"]["users"])
            users_data = pd.concat([users_data, new])
            users_data.drop_duplicates("id", inplace=True)
            users_data.to_pickle("users_" + query_tag + ".pkl")

    return tweets_data, users_data


In [None]:
tweets_data = pd.DataFrame()
users_data = pd.DataFrame()

for i in range(len(rules)):
    query_parameters["query"] = rules[i]["value"]
    query_tag = rules[i]["tag"]

    json_response = connect_to_endpoint(endpoint_url, headers, query_parameters)
    tweets_data, users_data = process_twitter_data(
        json_response, query_tag, tweets_data, users_data
    )

    time.sleep(5)

    while "next_token" in json_response["meta"]:
        query_parameters["next_token"] = json_response["meta"]["next_token"]

        json_response = connect_to_endpoint(endpoint_url, headers, query_parameters)
        tweets_data, users_data = process_twitter_data(
            json_response, query_tag, tweets_data, users_data
        )

        time.sleep(5)