In [1]:
!pip install requests

Note: you may need to restart the kernel to use updated packages.


In [1]:
# 

In [None]:
import requests
import json
import os
import time
from datetime import datetime
import logging

# -----------------------------------------------------------------------------
# 0) Disclosure 
# -----------------------------------------------------------------------------

# The `requests` library is open-source and free to use.
# However, accessing Twitter's API may incur costs based on your API tier (Basic, Elevated, etc.).

# -----------------------------------------------------------------------------
# 1) Configuration
# -----------------------------------------------------------------------------

BEARER_TOKEN = "AAAAAAAAAAAAAAAAAAAAAIkYyAEAAAAAIh2Zu24EaVBRAmgAr5FoDZBdSgs%3DT62cDIf5iqmCk1Y4BfGC6aULnAcAJK3ps9AdSMM8ephmGaYpK7"

if not BEARER_TOKEN:
    raise ValueError("Bearer Token not found. Please set TWITTER_BEARER_TOKEN in your environment variables.")

# List of politicians' usernames to fetch tweets from
POLITICIANS = ["Arlukowicz"]  # Add more usernames as needed

# Maximum tweets
MAX_TWEETS = 5  # Adjust as needed (e.g., 3200)

# Start & end times (ISO8601 format)
START_TIME = "2024-01-01T00:00:00Z"
END_TIME   = "2024-12-31T23:59:59Z"

# Whether to exclude retweets and replies
EXCLUDE_RETWEETS = True
EXCLUDE_REPLIES  = True

# -----------------------------------------------------------------------------
# 2) Logging Setup
# -----------------------------------------------------------------------------

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("tweet_downloader.log"),
        logging.StreamHandler()
    ]
)

# -----------------------------------------------------------------------------
# 3) Authentication Setup
# -----------------------------------------------------------------------------

def create_headers(bearer_token):
    """
    Create headers for Twitter API requests.
    """
    headers = {
        "Authorization": f"Bearer {bearer_token}",
        "User-Agent": "v2UserTweetsPython"
    }
    return headers

# -----------------------------------------------------------------------------
# 4) Categorize Tweets
# -----------------------------------------------------------------------------

def categorize_tweet(tweet):
    """
    Categorize the tweet as 'Original', 'Reply', 'Retweet', or 'Quote'.
    """
    if "referenced_tweets" in tweet:
        for ref in tweet["referenced_tweets"]:
            if ref["type"] == "retweeted":
                return "Retweet"
            elif ref["type"] == "replied_to":
                return "Reply"
            elif ref["type"] == "quoted":
                return "Quote"
    elif "in_reply_to_user_id" in tweet and tweet["in_reply_to_user_id"] is not None:
        return "Reply"
    return "Original"

# -----------------------------------------------------------------------------
# 5) Fetch Functions
# -----------------------------------------------------------------------------

def get_user_id(username, headers):
    """
    Retrieves the user ID for a given username.
    """
    url = f"https://api.twitter.com/2/users/by/username/{username}"
    params = {
        "user.fields": "id,name,username"
    }
    response = requests.get(url, headers=headers, params=params)
    
    if response.status_code != 200:
        logging.error(f"Error fetching user ID for '{username}': {response.status_code} {response.text}")
        return None
    
    data = response.json()
    if "data" in data:
        return data["data"]["id"]
    else:
        logging.warning(f"No data found for user '{username}'.")
        return None

def fetch_tweets(user_id, headers, max_tweets, start_time, end_time, exclude_retweets=True, exclude_replies=True):
    """
    Fetches tweets for a given user ID with specified parameters.
    """
    url = f"https://api.twitter.com/2/users/{user_id}/tweets"
    
    # Define fields and expansions
    tweet_fields = [
        "attachments", "author_id", "context_annotations", "conversation_id", "created_at",
        "edit_controls", "entities", "geo", "id", "in_reply_to_user_id", "lang",
        "possibly_sensitive", "public_metrics", "referenced_tweets", "reply_settings",
        "source", "text", "withheld"
    ]
    user_fields = [
        "created_at", "description", "entities", "id", "location", "name", "pinned_tweet_id",
        "profile_image_url", "protected", "public_metrics", "url", "username", "verified", "withheld"
    ]
    media_fields = [
        "duration_ms", "height", "media_key", "preview_image_url", "type", "url",
        "width", "public_metrics", "alt_text", "variants"
    ]
    place_fields = [
        "contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type"
    ]
    poll_fields = [
        "duration_minutes", "end_datetime", "id", "options", "voting_status"
    ]

    # Define query parameters
    params = {
        "max_results": 5,  # Maximum allowed per request
        "start_time": start_time,
        "end_time": end_time,
        "tweet.fields": ",".join(tweet_fields),
        "user.fields": ",".join(user_fields),
        "media.fields": ",".join(media_fields),
        "place.fields": ",".join(place_fields),
        "poll.fields": ",".join(poll_fields),
        "expansions": ",".join([
            "attachments.poll_ids",
            "attachments.media_keys",
            "author_id",
            "in_reply_to_user_id",
            "referenced_tweets.id",
            "referenced_tweets.id.author_id",
            "entities.mentions.username",
            "geo.place_id",
            "edit_history_tweet_ids"
        ])
    }
    
    # Handle exclusions
    exclude = []
    if exclude_retweets:
        exclude.append("retweets")
    if exclude_replies:
        exclude.append("replies")
    if exclude:
        params["exclude"] = ",".join(exclude)
    
    tweets = []
    next_token = None
    max_iterations = 5  # Prevent infinite loops
    iterations = 0
    
    while len(tweets) < max_tweets and iterations < max_iterations:
        iterations += 1
        if next_token:
            params["pagination_token"] = next_token
        
        response = requests.get(url, headers=headers, params=params)
        
        if response.status_code == 429:
            # Rate limit exceeded
            reset_time = int(response.headers.get("x-rate-limit-reset", time.time() + 60))
            sleep_duration = reset_time - int(time.time()) + 5  # Add buffer
            if sleep_duration > 0:
                logging.warning(f"Rate limit exceeded. Sleeping for {sleep_duration} seconds.")
                time.sleep(sleep_duration)
                continue  # Retry after sleeping
            else:
                # Reset time already passed
                continue
        elif response.status_code in {500, 502, 503, 504}:
            # Handle server errors with exponential backoff
            logging.error(f"Server error {response.status_code}. Retrying after a short delay.")
            time.sleep(5)
            continue
        elif response.status_code == 401:
            logging.error("Unauthorized. Check your Bearer Token.")
            break
        elif response.status_code == 403:
            logging.error("Forbidden. You might not have access to this resource.")
            break
        elif response.status_code != 200:
            logging.error(f"Error fetching tweets: {response.status_code} {response.text}")
            break
        
        data = response.json()
        if "data" not in data:
            logging.info("No more tweets found.")
            break
        
        fetched_tweets = data["data"]
        for tweet in fetched_tweets:
            tweet["category"] = categorize_tweet(tweet)
        
        tweets.extend(fetched_tweets)
        logging.info(f"Fetched {len(tweets)} tweets so far for user ID {user_id}.")
        
        # Check for pagination
        meta = data.get("meta", {})
        next_token = meta.get("next_token", None)
        if not next_token:
            break  # No more pages
        
        # Respect rate limits by sleeping a bit between requests
        time.sleep(1)  # Sleep 1 second between requests to be polite
    
    if iterations >= max_iterations:
        logging.warning("Maximum iterations reached. There might be more tweets to fetch.")
    
    # Trim to max_tweets if necessary
    return tweets[:max_tweets]

# -----------------------------------------------------------------------------
# 6) Save Function
# -----------------------------------------------------------------------------

def save_tweets_to_json(tweets, filename):
    """
    Saves a list of tweet dictionaries to a JSON file, appending new tweets
    and avoiding duplicates based on tweet IDs.
    """
    existing_data = []
    existing_ids = set()
    
    # Load existing data if file exists
    if os.path.exists(filename):
        try:
            with open(filename, "r", encoding="utf-8") as f:
                existing_data = json.load(f)
                existing_ids = {tweet["id"] for tweet in existing_data if "id" in tweet}
        except json.JSONDecodeError:
            logging.warning(f"Warning: {filename} is not a valid JSON. Overwriting.")
            existing_data = []
            existing_ids = set()
    
    # Filter out duplicate tweets
    new_tweets = [tweet for tweet in tweets if tweet["id"] not in existing_ids]
    
    if not new_tweets:
        logging.info(f"No new tweets to add for {filename}.")
        return
    
    # Append new tweets
    existing_data.extend(new_tweets)
    
    # Save back to JSON
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(existing_data, f, ensure_ascii=False, indent=2)
    
    logging.info(f"Added {len(new_tweets)} new tweets to {filename}.")

# -----------------------------------------------------------------------------
# 7) Main Execution
# -----------------------------------------------------------------------------

def main():
    headers = create_headers(BEARER_TOKEN)
    
    for username in POLITICIANS:
        logging.info(f"\nFetching up to {MAX_TWEETS} tweets from @{username}...")
        
        user_id = get_user_id(username, headers)
        if not user_id:
            continue  # Skip to next user if ID not found
        
        tweets = fetch_tweets(
            user_id=user_id,
            headers=headers,
            max_tweets=MAX_TWEETS,
            start_time=START_TIME,
            end_time=END_TIME,
            exclude_retweets=EXCLUDE_RETWEETS,
            exclude_replies=EXCLUDE_REPLIES
        )
        
        if tweets:
            filename = f"{username}_tweets.json"
            save_tweets_to_json(tweets, filename)
        else:
            logging.info(f"No tweets fetched for @{username}.")

if __name__ == "__main__":
    main()


2025-01-09 20:41:33,659 - INFO - 
Fetching up to 5 tweets from @Arlukowicz...
