In [41]:
import requests
import pandas as pd
import datetime
import os
import time
import json
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

True

In [42]:
# ---- CONFIGURATION ----
# symbols = ["AAPL", "MSFT", "NVDA"]  # Stock tickers to search for
symbols = ["AAPL"]
max_tweets_per_day = 25           # Maximum tweets per day per symbol
output_dir = "../data/twitter"      # Directory to save tweet data
# ------------------------

# TwitterAPI.io configuration
TWITTERAPI_BASE_URL = "https://api.twitterapi.io"

# Calculate date range - October 1, 2020 to October 1, 2025
start_date = datetime.datetime(2020, 10, 1)
end_date = datetime.datetime(2025, 10, 1)
# start_date = datetime.datetime(2024, 9, 1)
# end_date = datetime.datetime(2024, 10, 1)

print(f"Searching tweets from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
print(f"Total days to search: {(end_date - start_date).days}")
print(f"Tweets per day per symbol: {max_tweets_per_day}")
print(f"Total tweets expected: {len(symbols) * max_tweets_per_day * (end_date - start_date).days:,}")
print(f"Expected cost: ~${(len(symbols) * max_tweets_per_day * (end_date - start_date).days * 0.15 / 1000):.2f}")

Searching tweets from 2020-10-01 to 2025-10-01
Total days to search: 1826
Tweets per day per symbol: 25
Total tweets expected: 45,650
Expected cost: ~$6.85


In [43]:
# TwitterAPI.io API key setup - CORRECTED VERSION
def setup_twitterapi_client():
    """Setup TwitterAPI.io client with authentication"""
    api_key = os.getenv('TWITTERAPI_API_KEY')
    
    if not api_key:
        print("❌ TWITTERAPI_API_KEY not found in environment variables")
        print("📝 Please add your API key to .env file:")
        print("   TWITTERAPI_API_KEY=your_api_key_here")
        print("🔗 Get your free API key at: https://twitterapi.io")
        return None
    
    # TwitterAPI.io uses X-API-Key header (capital X)
    headers = {
        'X-API-Key': api_key,
        'Content-Type': 'application/json'
    }
    
    return headers

# Initialize the API client
api_headers = setup_twitterapi_client()
if api_headers:
    print("✅ TwitterAPI.io client initialized successfully")
    print("💰 You have $0.1 in free credits to start with")
    print(f"🔑 Using headers: {list(api_headers.keys())}")
else:
    print("❌ Failed to initialize TwitterAPI.io client")

✅ TwitterAPI.io client initialized successfully
💰 You have $0.1 in free credits to start with
🔑 Using headers: ['X-API-Key', 'Content-Type']


In [44]:
def search_tweets_for_symbol_daily(symbol, start_date, end_date, max_tweets_per_day=50):
    """
    Search for tweets containing a specific stock ticker, collecting max_tweets_per_day for each day
    Only collects English language tweets
    
    Args:
        symbol (str): Stock ticker symbol (e.g., 'AAPL')
        start_date (datetime): Start date for search
        end_date (datetime): End date for search
        max_tweets_per_day (int): Maximum number of tweets to retrieve per day
    
    Returns:
        list: List of tweet data dictionaries
    """
    if not api_headers:
        print("❌ TwitterAPI.io client not initialized")
        return []
    
    all_tweets = []
    current_date = start_date
    
    print(f"🔍 Searching for ${symbol} from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
    print(f"📅 Processing {(end_date - start_date).days} days...")
    
    while current_date < end_date:
        # Create date range for this specific day
        day_start = current_date
        day_end = current_date + datetime.timedelta(days=1)
        
        # Create search query for this specific day - ADD ENGLISH LANGUAGE FILTER
        query = f"${symbol} lang:en since:{day_start.strftime('%Y-%m-%d')} until:{day_end.strftime('%Y-%m-%d')}"
        
        # TwitterAPI.io advanced search endpoint
        url = f"{TWITTERAPI_BASE_URL}/twitter/tweet/advanced_search"
        
        tweets_for_day = []
        cursor = ''
        
        try:
            while len(tweets_for_day) < max_tweets_per_day and cursor is not None:
                params = {
                    'query': query,
                    'queryType': 'Latest',
                    'cursor': cursor
                }
                
                response = requests.get(url, headers=api_headers, params=params)
                
                if response.status_code == 200:
                    data = response.json()
                    
                    if 'tweets' in data and data['tweets']:
                        for tweet in data['tweets']:
                            if len(tweets_for_day) >= max_tweets_per_day:
                                break
                            
                            # Double-check language filter (backup check)
                            tweet_lang = tweet.get('lang', 'unknown')
                            if tweet_lang != 'en':
                                continue  # Skip non-English tweets
                                
                            tweet_data = {
                                'symbol': symbol,
                                'tweet_id': tweet['id'],
                                'text': tweet['text'],
                                'created_at': tweet['createdAt'],
                                'user_id': tweet['author']['id'],
                                'username': tweet['author']['userName'],
                                'user_name': tweet['author']['name'],
                                'retweet_count': tweet['retweetCount'],
                                'like_count': tweet['likeCount'],
                                'reply_count': tweet['replyCount'],
                                'quote_count': tweet['quoteCount'],
                                'view_count': tweet.get('viewCount', 0),
                                'is_reply': tweet['isReply'],
                                'conversation_id': tweet.get('conversationId'),
                                'url': tweet['url'],
                                'lang': tweet_lang,  # Store the language for verification
                                'search_date': current_date.strftime('%Y-%m-%d')
                            }
                            tweets_for_day.append(tweet_data)
                        
                        # Check for next page
                        if data['has_next_page'] and data['next_cursor']:
                            cursor = data['next_cursor']
                        else:
                            cursor = None
                    else:
                        cursor = None
                else:
                    print(f"❌ API Error for ${symbol} on {current_date.strftime('%Y-%m-%d')}: {response.status_code}")
                    break
                
                time.sleep(0.5)  # Rate limiting between requests
            
            all_tweets.extend(tweets_for_day)
            
            # Progress update every 30 days
            if (current_date - start_date).days % 30 == 0:
                print(f"📊 Progress: {current_date.strftime('%Y-%m-%d')} - {len(all_tweets)} tweets collected so far")
            
        except Exception as e:
            print(f"❌ Error searching tweets for ${symbol} on {current_date.strftime('%Y-%m-%d')}: {e}")
        
        # Move to next day
        current_date += datetime.timedelta(days=1)
        
        # Rate limiting between days
        time.sleep(1)
    
    print(f"✅ Found {len(all_tweets)} English tweets for ${symbol} across {(end_date - start_date).days} days")
    return all_tweets

In [45]:
def collect_tweets_for_all_symbols_daily(symbols, start_date, end_date, max_tweets_per_day):
    """
    Collect tweets for all symbols with daily limits
    """
    all_tweets = []
    
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    
    for i, symbol in enumerate(symbols):
        print(f"\n--- Processing {symbol} ({i+1}/{len(symbols)}) ---")
        
        # Search for tweets with daily limits
        tweets = search_tweets_for_symbol_daily(symbol, start_date, end_date, max_tweets_per_day)
        all_tweets.extend(tweets)
        
        # Rate limiting between symbols
        if i < len(symbols) - 1:
            print("⏳ Waiting 2 seconds before next symbol...")
            time.sleep(2)
    
    return all_tweets

In [46]:
def process_and_save_tweets(tweets_data, output_dir):
    """
    Process tweet data and save to CSV files
    """
    if not tweets_data:
        print("No tweets to process")
        return
    
    # Convert to DataFrame
    df = pd.DataFrame(tweets_data)
    
    # Convert datetime columns
    df['created_at'] = pd.to_datetime(df['created_at'])
    
    # Sort by symbol and date
    df = df.sort_values(['symbol', 'created_at'])
    
    # Save individual files per symbol in ../data/[STOCK TICKER]/ format
    for symbol in df['symbol'].unique():
        symbol_df = df[df['symbol'] == symbol]
        
        # Create symbol directory in ../data/[STOCK TICKER]/ format
        symbol_dir = f"../data/{symbol}"  # This matches your existing stock data structure
        os.makedirs(symbol_dir, exist_ok=True)
        
        # Save to CSV with correct date range
        filename = f"{symbol}_tweets_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv"
        filepath = os.path.join(symbol_dir, filename)
        symbol_df.to_csv(filepath, index=False)
        
        print(f"✅ Saved {len(symbol_df)} tweets for {symbol} to {filepath}")
    
    # Save combined file in twitter directory
    combined_filename = f"all_tweets_{start_date.strftime('%Y-%m-%d')}_{end_date.strftime('%Y-%m-%d')}.csv"
    combined_filepath = os.path.join(output_dir, combined_filename)
    df.to_csv(combined_filepath, index=False)
    print(f"✅ Saved combined file with {len(df)} tweets to {combined_filepath}")
    
    return df

In [47]:
# def test_twitterapi_connection():
#     """
#     Test TwitterAPI.io connection using the advanced_search endpoint
#     """
#     print("🧪 Testing TwitterAPI.io Advanced Search Connection...")
#     print("=" * 60)
    
#     # Test 1: Check if API key is loaded
#     if not api_headers:
#         print("❌ API headers not found. Please check your .env file")
#         return False
    
#     print("✅ API headers loaded successfully")
#     print(f"🔑 Using headers: {list(api_headers.keys())}")
    
#     # Test 2: Test API connection with minimal request
#     try:
#         # Use the correct advanced search endpoint
#         url = f"{TWITTERAPI_BASE_URL}/twitter/tweet/advanced_search"
        
#         # Test parameters based on the documentation
#         params = {
#             'query': '$AAPL',  # Stock ticker with $ symbol
#             'queryType': 'Latest',  # Latest tweets
#             'cursor': ''  # Start from beginning
#         }
        
#         print(f"🔍 Testing search for: '{params['query']}'")
#         print(f"📡 Making API request to: {url}")
#         print(f"📋 Parameters: {params}")
        
#         response = requests.get(url, headers=api_headers, params=params)
        
#         print(f"📊 Response status: {response.status_code}")
        
#         if response.status_code == 200:
#             data = response.json()
#             print("✅ API connection successful!")
            
#             if 'tweets' in data and len(data['tweets']) > 0:
#                 print(f"📈 Found {len(data['tweets'])} tweets")
                
#                 # Show first tweet as example
#                 first_tweet = data['tweets'][0]
#                 print("\n📝 Sample tweet:")
#                 print(f"   Text: {first_tweet['text'][:100]}...")
#                 print(f"   User: @{first_tweet['author']['userName']}")
#                 print(f"   Date: {first_tweet['createdAt']}")
#                 print(f"   Retweets: {first_tweet['retweetCount']}")
#                 print(f"   Likes: {first_tweet['likeCount']}")
#                 print(f"   URL: {first_tweet['url']}")
                
#                 # Show pagination info
#                 print(f"\n📄 Pagination:")
#                 print(f"   Has next page: {data['has_next_page']}")
#                 print(f"   Next cursor: {data['next_cursor']}")
                
#                 return True
#             else:
#                 print("⚠️ API connected but no tweets found")
#                 print("   This might be normal if there are no recent tweets for the query")
#                 return True
#         else:
#             print(f"❌ API Error: {response.status_code}")
#             print(f"   Response: {response.text}")
#             return False
            
#     except Exception as e:
#         print(f"❌ Connection test failed: {e}")
#         return False

# # Run the corrected test
# test_results = test_twitterapi_connection()

In [48]:
# Master execution cell - runs the complete tweet collection pipeline
print("🚀 Starting Complete Tweet Collection Pipeline")
print("=" * 60)

# Step 1: Collect tweets for all symbols (daily approach)
print("\n📡 Step 1: Collecting tweets for all symbols...")
all_tweets = collect_tweets_for_all_symbols_daily(symbols, start_date, end_date, max_tweets_per_day)
print(f"✅ Step 1 Complete: Collected {len(all_tweets)} total tweets")

# Step 2: Process and save the tweets
if all_tweets:
    print("\n💾 Step 2: Processing and saving tweets...")
    tweet_df = process_and_save_tweets(all_tweets, output_dir)
    print(f"✅ Step 2 Complete: Processed and saved {len(tweet_df)} tweets")
    
    # Step 3: Display summary statistics
    print("\n📊 Step 3: Summary Statistics")
    print("=" * 40)
    print(f"Total tweets collected: {len(tweet_df)}")
    print(f"Date range: {tweet_df['created_at'].min()} to {tweet_df['created_at'].max()}")
    print(f"Tweets per symbol:")
    print(tweet_df['symbol'].value_counts())
    
    # Calculate total engagement
    tweet_df['total_engagement'] = tweet_df['retweet_count'] + tweet_df['like_count'] + tweet_df['reply_count']
    print(f"\nAverage engagement per tweet: {tweet_df['total_engagement'].mean():.2f}")
    print(f"Most engaging tweet: {tweet_df.loc[tweet_df['total_engagement'].idxmax(), 'text'][:100]}...")
    
    print("\n🎉 Tweet collection pipeline completed successfully!")
    print(f"💰 Actual cost: ~${(len(tweet_df) * 0.15 / 1000):.2f}")
    
else:
    print("❌ No tweets were collected - check your configuration and API key")

🚀 Starting Complete Tweet Collection Pipeline

📡 Step 1: Collecting tweets for all symbols...

--- Processing AAPL (1/1) ---
🔍 Searching for $AAPL from 2020-10-01 to 2025-10-01
📅 Processing 1826 days...
📊 Progress: 2020-10-01 - 25 tweets collected so far
📊 Progress: 2020-10-31 - 775 tweets collected so far
📊 Progress: 2020-11-30 - 1525 tweets collected so far
📊 Progress: 2020-12-30 - 2275 tweets collected so far
📊 Progress: 2021-01-29 - 3025 tweets collected so far
📊 Progress: 2021-02-28 - 3758 tweets collected so far
📊 Progress: 2021-03-30 - 4507 tweets collected so far
📊 Progress: 2021-04-29 - 5257 tweets collected so far
📊 Progress: 2021-05-29 - 5979 tweets collected so far
📊 Progress: 2021-06-28 - 6729 tweets collected so far
📊 Progress: 2021-07-28 - 7477 tweets collected so far
📊 Progress: 2021-08-27 - 8227 tweets collected so far
📊 Progress: 2021-09-26 - 8977 tweets collected so far
📊 Progress: 2021-10-26 - 9727 tweets collected so far
📊 Progress: 2021-11-25 - 10477 tweets collec

  df['created_at'] = pd.to_datetime(df['created_at'])


✅ Saved 38553 tweets for AAPL to ../data/AAPL/AAPL_tweets_2020-10-01_2025-10-01.csv
✅ Saved combined file with 38553 tweets to ../data/twitter/all_tweets_2020-10-01_2025-10-01.csv
✅ Step 2 Complete: Processed and saved 38553 tweets

📊 Step 3: Summary Statistics
Total tweets collected: 38553
Date range: 2020-10-01 21:58:03+00:00 to 2025-09-30 23:59:51+00:00
Tweets per symbol:
symbol
AAPL    38553
Name: count, dtype: int64

Average engagement per tweet: 10.81
Most engaging tweet: Market close: $NVDA: -16.91% | $AAPL: +3.21%

Why is DeepSeek great for Apple?

Here's a breakdown o...

🎉 Tweet collection pipeline completed successfully!
💰 Actual cost: ~$5.78
