# Twitter/X Data Ingestion Pipeline

This notebook collects finance-related tweets using the Twitter API (via Tweepy).

**Features:**
- Search tweets by keywords
- Filter by language, date range
- Clean and normalize tweet text
- Export to CSV format

**Note:** This is an exploration notebook. For production use, see `backend/app/pipelines/ingest_twitter.py`

## 1. Setup and Imports

In [43]:
import os
import re
import csv
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional

# Load environment variables
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    print("‚ö†Ô∏è  python-dotenv not installed. Install with: pip install python-dotenv")

# Twitter API library
try:
    import tweepy
    print(f"‚úì Tweepy version: {tweepy.__version__}")
except ImportError:
    print("‚ùå Tweepy not installed. Install with: pip install tweepy")

print("‚úì Imports complete")

‚úì Tweepy version: 4.16.0
‚úì Imports complete


## 2. Configuration

In [44]:
# API Credentials (from .env file)
TWITTER_BEARER_TOKEN = os.getenv('TWITTER_BEARER_TOKEN')

# Search configuration
# Note: Free tier = 1,500 tweets/month (50 tweets/day budget)
KEYWORDS = [
    'stock market',
    'stocks',
    'earnings',
    'fed rate',
    'inflation',
    'NVDA',
    'TSLA',
    'AAPL',
    'wall street',
    'bull market',
    'bear market'
]

MAX_TWEETS = 30  # Conservative: ~50 tweets/day budget for free tier
LANGUAGE = 'en'

# Quality filters
MIN_ENGAGEMENT = 5  # Minimum total engagement (likes + retweets + replies)

# Output configuration
OUTPUT_DIR = Path('../data/processed/twitter')
RUN_ID = datetime.utcnow().strftime('%Y-%m-%d')

print(f"Output directory: {OUTPUT_DIR / RUN_ID}")
print(f"Keywords: {', '.join(KEYWORDS)}")
print(f"Max tweets: {MAX_TWEETS}")
print(f"Min engagement threshold: {MIN_ENGAGEMENT}")

Output directory: ..\data\processed\twitter\2025-10-28
Keywords: stock market, stocks, earnings, fed rate, inflation, NVDA, TSLA, AAPL, wall street, bull market, bear market
Max tweets: 30
Min engagement threshold: 5


## 3. Helper Functions

In [45]:
# URL pattern for cleaning
_URL_RE = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')

def clean_text(txt: Optional[str]) -> str:
    """
    Remove URLs, mentions, hashtags (optional), and normalize whitespace.
    
    Args:
        txt: Input text string
        
    Returns:
        Cleaned text
    """
    if not txt:
        return ''
    
    # Remove URLs
    txt = _URL_RE.sub('', txt)
    
    # Remove @mentions
    txt = re.sub(r'@\w+', '', txt)
    
    # Keep hashtags but remove # symbol (they're useful for sentiment)
    txt = re.sub(r'#(\w+)', r'\1', txt)
    
    # Normalize whitespace
    txt = re.sub(r'\s+', ' ', txt)
    
    return txt.strip()

# Test the function
test_tweet = "Check out $TSLA! üöÄ @elonmusk https://example.com #stocks #trading"
print(f"Original: {test_tweet}")
print(f"Cleaned:  {clean_text(test_tweet)}")

Original: Check out $TSLA! üöÄ @elonmusk https://example.com #stocks #trading
Cleaned:  Check out $TSLA! üöÄ stocks trading


In [46]:
def normalize_tweet(tweet) -> Dict[str, Any]:
    """
    Extract and normalize fields from a Twitter API v2 tweet object.
    
    Args:
        tweet: Tweepy Tweet object
        
    Returns:
        Dictionary with normalized tweet data
    """
    # Extract user data if available
    author_id = tweet.author_id if hasattr(tweet, 'author_id') else None
    
    # Get metrics
    metrics = tweet.public_metrics if hasattr(tweet, 'public_metrics') else {}
    
    return {
        'id': tweet.id,
        'text': clean_text(tweet.text),
        'raw_text': tweet.text,  # Keep original for reference
        'author_id': author_id,
        'created_at': tweet.created_at.isoformat() if hasattr(tweet, 'created_at') else None,
        'retweet_count': metrics.get('retweet_count', 0),
        'reply_count': metrics.get('reply_count', 0),
        'like_count': metrics.get('like_count', 0),
        'quote_count': metrics.get('quote_count', 0),
        'lang': tweet.lang if hasattr(tweet, 'lang') else None,
    }

print("‚úì Helper functions defined")

‚úì Helper functions defined


In [47]:
def build_query(keywords: List[str], lang: str = 'en') -> str:
    """
    Build a Twitter search query from keywords.
    
    Args:
        keywords: List of keywords to search for
        lang: Language code (default: 'en')
        
    Returns:
        Query string
    """
    # Quote multi-word phrases
    terms = [f'"{k}"' if ' ' in k else k for k in keywords]
    
    # Join with OR and add language filter
    query = '(' + ' OR '.join(terms) + f') lang:{lang}'
    
    # Exclude retweets for cleaner data
    query += ' -is:retweet'
    
    return query

# Test the query builder
test_query = build_query(KEYWORDS[:3], LANGUAGE)
print(f"Query: {test_query}")

Query: ("stock market" OR stocks OR earnings) lang:en -is:retweet


In [None]:
def filter_spam_and_bots(tweets: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Remove spam, bots, and low-quality tweets.
    
    Args:
        tweets: List of tweet dictionaries
        
    Returns:
        Filtered list of quality tweets
    """
    filtered = []
    
    # Enhanced spam indicators
    spam_patterns = [
        # Promotional spam
        r'check\s+(out|latest)',
        r'follow\s+(me|us|for|him)',
        r'click\s+(here|link)',
        r'dm\s+me',
        
        # Stock promotion spam (CAUGHT THE EXAMPLES!)
        r'this\s+(blogger|investor|trader).+recommends?\s+stocks?',
        r'recommends?\s+stocks?\s+that\s+rise',
        r'his\s+judgment\s+is.+(accurate|incredible)',
        r'buy\s+the\s+stocks?\s+(he|she|they)\s+recommends?',
        r'make\s+money\s+every\s+day',
        r'you\s+can\s+also\s+follow',
        
        # Other spam
        r'recommend\s+a\s+blogger',
        r'just\s+earned',
        r'simulation\s+market',
        r'airdrop',
        r'free\s+money',
        r'guaranteed\s+profit',
        r'\d+%\s+movement\s+in',
    ]
    
    for tweet in tweets:
        text_lower = tweet['text'].lower()
        raw_lower = tweet['raw_text'].lower()
        
        # Skip if spam pattern detected
        is_spam = any(re.search(pattern, text_lower) for pattern in spam_patterns)
        if is_spam:
            continue
        
        # Skip if too short (likely not meaningful)
        if len(tweet['text']) < 20:
            continue
        
        # Detect suspicious uniform engagement (bot networks)
        # Real tweets rarely have EXACTLY the same likes, retweets, and replies
        if (tweet['like_count'] == tweet['retweet_count'] == tweet['reply_count'] 
            and tweet['like_count'] > 0):
            continue  # Likely bot network with fake engagement
        
        # Skip if too many emojis (often spam)
        emoji_pattern = r'[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF]'
        emoji_count = len(re.findall(emoji_pattern, tweet['raw_text']))
        if emoji_count > 5:  # More than 5 emojis = likely spam
            continue
        
        # Skip if excessive hashtags (common in spam)
        hashtag_count = tweet['raw_text'].count('#')
        if hashtag_count > 8:  # More than 8 hashtags = spam
            continue
        
        # Skip if contains too many cashtags (spam pattern)
        cashtag_count = tweet['raw_text'].count('$')
        if cashtag_count > 5:  # More than 5 stock symbols = spam
            continue
        
        # Skip if starts with multiple random emojis (spam signature)
        if re.match(r'^[\U0001F000-\U0001FFFF\s]{10,}', tweet['raw_text']):
            continue
        
        filtered.append(tweet)
    
    return filtered


def filter_by_engagement(tweets: List[Dict[str, Any]], min_engagement: int = 5) -> List[Dict[str, Any]]:
    """
    Keep only tweets with minimum engagement threshold.
    
    Args:
        tweets: List of tweet dictionaries
        min_engagement: Minimum total engagement (likes + retweets + replies)
        
    Returns:
        Filtered list of tweets
    """
    return [
        t for t in tweets 
        if (t['like_count'] + t['retweet_count'] + t['reply_count']) >= min_engagement
    ]


print("‚úì Quality filter functions defined")

‚úì Quality filter functions defined


## 4. Initialize Twitter API Client

In [49]:
def initialize_twitter_client() -> tweepy.Client:
    """
    Initialize and authenticate Twitter API client.
    
    Returns:
        Authenticated Tweepy Client
    """
    if not TWITTER_BEARER_TOKEN:
        raise ValueError(
            'Missing Twitter credentials. Set TWITTER_BEARER_TOKEN '
            'environment variable or create a .env file.'
        )
    
    client = tweepy.Client(
        bearer_token=TWITTER_BEARER_TOKEN,
        wait_on_rate_limit=True  # Automatically handle rate limits
    )
    
    print("‚úì Twitter client initialized")
    return client

# Initialize client
try:
    client = initialize_twitter_client()
except Exception as e:
    print(f"‚ùå Error: {e}")
    client = None

‚úì Twitter client initialized


## 5. Fetch Tweets

In [50]:
def fetch_tweets(
    client: tweepy.Client,
    keywords: List[str],
    max_results: int = 100,
    lang: str = 'en',
    min_engagement: int = 0
) -> List[Dict[str, Any]]:
    """
    Fetch tweets matching keywords with quality filtering.
    
    Args:
        client: Authenticated Tweepy client
        keywords: List of keywords to search for
        max_results: Maximum number of tweets to fetch (10-100 per request)
        lang: Language code
        min_engagement: Minimum engagement threshold
        
    Returns:
        List of normalized tweet dictionaries
    """
    query = build_query(keywords, lang)
    print(f"Query: {query}")
    print(f"Fetching up to {max_results} tweets...")
    
    tweets = []
    seen = set()
    
    try:
        # Twitter API v2 recent search
        response = client.search_recent_tweets(
            query=query,
            max_results=min(max_results, 100),  # API limit is 100 per request
            tweet_fields=['created_at', 'author_id', 'lang', 'public_metrics'],
        )
        
        # Check if response has data (Pylance may show warning, but this is correct)
        if not response.data:  # type: ignore
            print("‚ö†Ô∏è  No tweets found")
            return []
        
        for tweet in response.data:  # type: ignore
            if tweet.id not in seen:
                seen.add(tweet.id)
                tweets.append(normalize_tweet(tweet))
        
        print(f"‚úì Fetched {len(tweets)} raw tweets")
        
        # Apply quality filters
        print("Applying quality filters...")
        filtered_spam = filter_spam_and_bots(tweets)
        print(f"  After spam filter: {len(filtered_spam)} tweets")
        
        filtered_engagement = filter_by_engagement(filtered_spam, min_engagement)
        print(f"  After engagement filter (min={min_engagement}): {len(filtered_engagement)} tweets")
        
        return filtered_engagement
        
    except tweepy.TweepyException as e:
        print(f"‚ùå Error fetching tweets: {e}")
        return []

# Fetch tweets with quality filtering
if client:
    tweets = fetch_tweets(client, KEYWORDS, MAX_TWEETS, LANGUAGE, MIN_ENGAGEMENT)
    print(f"\n‚úì Total quality tweets collected: {len(tweets)}")
else:
    print("‚ùå Client not initialized. Cannot fetch tweets.")
    tweets = []

Query: ("stock market" OR stocks OR earnings OR "fed rate" OR inflation OR NVDA OR TSLA OR AAPL OR "wall street" OR "bull market" OR "bear market") lang:en -is:retweet
Fetching up to 30 tweets...


Rate limit exceeded. Sleeping for 102 seconds.


‚úì Fetched 30 raw tweets
Applying quality filters...
  After spam filter: 25 tweets
  After engagement filter (min=5): 2 tweets

‚úì Total quality tweets collected: 2


## 6. Preview Data

In [51]:
# Display first few tweets
if tweets:
    print(f"\nFirst 3 tweets:\n")
    for i, tweet in enumerate(tweets[:3], 1):
        print(f"{i}. {tweet['text'][:100]}...")
        print(f"   Likes: {tweet['like_count']}, Retweets: {tweet['retweet_count']}")
        print(f"   Created: {tweet['created_at']}")
        print()


First 3 tweets:

1. ü•™üòãüßΩüåÑ This blogger, . recommends stocks that rise every day. His judgment is incredibly accurate. You...
   Likes: 9, Retweets: 9
   Created: 2025-10-28T10:16:47+00:00

2. ‚èØü•§üêπ‚ôê This investor is so accurate, Buy the stocks he recommends and you will make money every day. Y...
   Likes: 9, Retweets: 9
   Created: 2025-10-28T10:16:17+00:00



## 7. Export to CSV

In [52]:
def export_to_csv(
    tweets: List[Dict[str, Any]],
    output_dir: Path,
    run_id: str
) -> str:
    """
    Export tweets to CSV file.
    
    Args:
        tweets: List of tweet dictionaries
        output_dir: Output directory path
        run_id: Run identifier
        
    Returns:
        Path to output CSV file
    """
    if not tweets:
        print("‚ö†Ô∏è  No tweets to export")
        return ""
    
    # Create output directory
    run_dir = output_dir / run_id
    run_dir.mkdir(parents=True, exist_ok=True)
    
    # Output file path
    output_file = run_dir / f'twitter_finance_{run_id}.csv'
    
    # Write to CSV
    fieldnames = [
        'id', 'text', 'raw_text', 'author_id', 'created_at',
        'retweet_count', 'reply_count', 'like_count', 'quote_count', 'lang'
    ]
    
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(tweets)
    
    print(f"‚úì Exported {len(tweets)} tweets to {output_file}")
    
    # Also save metadata
    meta_file = run_dir / f'twitter_finance_{run_id}_meta.txt'
    with open(meta_file, 'w', encoding='utf-8') as f:
        f.write(f"Run ID: {run_id}\n")
        f.write(f"Timestamp: {datetime.utcnow().isoformat()}\n")
        f.write(f"Keywords: {', '.join(KEYWORDS)}\n")
        f.write(f"Language: {LANGUAGE}\n")
        f.write(f"Total tweets: {len(tweets)}\n")
    
    print(f"‚úì Saved metadata to {meta_file}")
    
    return str(output_file)

# Export data
if tweets:
    output_path = export_to_csv(tweets, OUTPUT_DIR, RUN_ID)
    print(f"\n‚úì Pipeline complete!")
else:
    print("\n‚ö†Ô∏è  No data to export")

‚úì Exported 2 tweets to ..\data\processed\twitter\2025-10-28\twitter_finance_2025-10-28.csv
‚úì Saved metadata to ..\data\processed\twitter\2025-10-28\twitter_finance_2025-10-28_meta.txt

‚úì Pipeline complete!


## 8. Data Summary

In [53]:
# Display statistics
if tweets:
    total_likes = sum(t['like_count'] for t in tweets)
    total_retweets = sum(t['retweet_count'] for t in tweets)
    avg_likes = total_likes / len(tweets)
    avg_retweets = total_retweets / len(tweets)
    
    print("\nüìä Summary Statistics:")
    print(f"Total tweets: {len(tweets)}")
    print(f"Total likes: {total_likes:,}")
    print(f"Total retweets: {total_retweets:,}")
    print(f"Average likes per tweet: {avg_likes:.1f}")
    print(f"Average retweets per tweet: {avg_retweets:.1f}")
    
    # Most engaged tweet
    most_liked = max(tweets, key=lambda t: t['like_count'])
    print(f"\nMost liked tweet ({most_liked['like_count']} likes):")
    print(f"  {most_liked['text'][:150]}...")


üìä Summary Statistics:
Total tweets: 2
Total likes: 18
Total retweets: 18
Average likes per tweet: 9.0
Average retweets per tweet: 9.0

Most liked tweet (9 likes):
  ü•™üòãüßΩüåÑ This blogger, . recommends stocks that rise every day. His judgment is incredibly accurate. You can also follow him. Premarket rise $GGAL Portfol...


## Next Steps

1. **Review the collected data** in the CSV file
2. **Adjust keywords** if needed for better coverage
3. **Convert to production script** once satisfied with results
4. **Add to pipeline** alongside Reddit ingestion

See `backend/app/pipelines/ingest_twitter.py` for the production version.