In [94]:
import json
import time
import csv
import requests
import os
from pprint import pprint
from configparser import ConfigParser
from random import randint

In [86]:
DEBUG_MODE = True

In [87]:
# Read api_key from config.ini
api_key: str | None = None
try:
    print("Reading api-key from config.ini ...")
    config = ConfigParser()
    config.read('../conf/config.ini') # adjust the path to config.ini
    api_key = config['TWITTERAPI']['API_KEY']
    print(f"API Key gefunden: {api_key[:5]} ...")

except Exception as e:
    print("Failed to read credentials. Exit with error:")
    print(e)

Reading api-key from config.ini ...
API Key gefunden: 1228a ...


In [88]:
if api_key is None:
    raise ValueError("Could not read api key.")

# Array to store tweets
all_tweets = []

# Parameters for api request
headers = {"X-API-Key": api_key}
url = "https://api.twitterapi.io/twitter/tweet/advanced_search"
cursor = ""

# counter
i = 1

In [90]:
# Endless loop based on has_next_page - break when false
while True:
    try:
        # Make https request
        response = requests.request(
            "GET",
            url,
            headers=headers,
            params={
                "query": "from:elonmusk since:2023-05-01 until:2025-05-01 -is:retweet",
                "cursor": cursor
                # querytype: "latest" by default
            }
        )

        if response.status_code == 200:
            # Debugging infos
            print(f"Status Code: {response.status_code}")

            # throw exception for http error
            response.raise_for_status()

            # Store response
            json_data = response.json()

            # Extract necessary vars
            tweets = json_data.get("tweets", [])
            has_next_page = json_data.get("has_next_page")
            next_cursor = json_data.get("next_cursor")

            # Store and debug tweets
            prev_tweet_count = len(all_tweets)
            all_tweets.extend(tweets)
            new_tweet_count = len(all_tweets) - prev_tweet_count
            print(f"New Tweets in iteration {i}: {new_tweet_count}")
            print(f"Total Tweet count: {len(all_tweets)}")

            # Break if debug mode is active
            if DEBUG_MODE:
                debug_mode_tweets = json_data["tweets"][new_tweet_count - 1]
                print(f'DEBUG_MODE active: stopping after first page. Got {new_tweet_count} tweets:')
                pprint(debug_mode_tweets)
                break

            if not has_next_page:
                print(f"Got {new_tweet_count} tweets. Finished.")
                break

            # Prepare the next iteration
            print(f"Next page detected @ '{next_cursor[:10]}'. Updating cursor ...")
            cursor = next_cursor
            i += 1

            # Wait random time to avoid rate limits
            delay = randint(1, 5)
            print(f"Starting new iteration in {delay} seconds ...")
            time.sleep(delay)

    except Exception as e:
        print("Error when requesting and processing tweets:")
        print(e)
        break



Status Code: 200
New Tweets in iteration 1: 20
Total Tweet count: 20
DEBUG_MODE active: stopping after first page. Got 20 tweets:
[{'type': 'tweet', 'id': '1917726279195058338', 'url': 'https://x.com/elonmusk/status/1917726279195058338', 'twitterUrl': 'https://twitter.com/elonmusk/status/1917726279195058338', 'text': 'https://t.co/U6tI9pdin6', 'source': 'Twitter for iPhone', 'retweetCount': 16617, 'replyCount': 5733, 'likeCount': 106639, 'quoteCount': 777, 'viewCount': 28316471, 'createdAt': 'Wed Apr 30 23:42:29 +0000 2025', 'lang': 'zxx', 'bookmarkCount': 5168, 'isReply': False, 'inReplyToId': None, 'conversationId': '1917726279195058338', 'inReplyToUserId': None, 'inReplyToUsername': None, 'author': {'type': 'user', 'userName': 'elonmusk', 'url': 'https://x.com/elonmusk', 'twitterUrl': 'https://twitter.com/elonmusk', 'id': '44196397', 'name': 'Elon Musk', 'isVerified': False, 'isBlueVerified': True, 'profilePicture': 'https://pbs.twimg.com/profile_images/1893803697185910784/Na5lOWi5_

In [111]:
try:
    print('Finished request. Converting results to file ...')

    # Check if the target file exists and create if not
    filepath = "res/tweets.csv"
    os.makedirs(os.path.dirname(filepath), exist_ok=True)

    # Adding rows
    existing_ids = set()
    if os.path.exists(filepath):
        with open(filepath, "r", encoding="utf-8", newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                existing_ids.add(row["id"])

    with open(filepath, "a", encoding="utf-8", newline='') as f:
        if not all_tweets:
            print("No tweets to write. Exiting ...")
            exit()

        def flatten_tweet(tweet):
            if not tweet:
                return {}

            flat = {}
            for k, v in tweet.items():
                try:
                    if isinstance(v, (dict, list)):
                        flat[k] = json.dumps(v, ensure_ascii=False)
                    else:
                        flat[k] = v
                except Exception as e:
                    print(f"Error when processing field {k}: {e}")
                    flat[k] = None
            return flat

        flattened_tweets = [flatten_tweet(tw) for tw in all_tweets]
        if not flattened_tweets:
            print("Did not find tweets to write. Exiting ...")
            exit()

        fieldnames = list(flattened_tweets[0].keys())

        writer = csv.DictWriter(f, fieldnames=fieldnames)
        if os.stat(filepath).st_size == 0:
            writer.writeheader()

        new_count = 0
        for tweet in flattened_tweets:
            if tweet["id"] not in existing_ids:
                writer.writerow(tweet)
                new_count += 1

        print(f"Wrote {new_count} new tweets to CSV.")

except Exception as e:
    print('Failed to convert tweets to file. Exit with error:')
    print(e)

Finished request. Converting results to file ...
Wrote 20 new tweets to CSV.
