In [None]:
# Import required libraries
import os
import tweepy
import pandas as pd
import json
import time
from datetime import datetime, timedelta
from dotenv import load_dotenv
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Configure rate limit handler
def handle_rate_limit(cursor):
    while True:
        try:
            yield next(cursor)
        except tweepy.RateLimitError:
            logger.info("Rate limit reached. Waiting for 15 minutes...")
            time.sleep(15 * 60)  # Wait for 15 minutes
        except StopIteration:
            break
        except Exception as e:
            logger.error(f"Error: {str(e)}")
            break


In [None]:
# Twitter API Authentication
try:
    client = tweepy.Client(
        bearer_token=os.getenv('TWITTER_BEARER_TOKEN'),
        consumer_key=os.getenv('TWITTER_API_KEY'),
        consumer_secret=os.getenv('TWITTER_API_SECRET'),
        access_token=os.getenv('TWITTER_ACCESS_TOKEN'),
        access_token_secret=os.getenv('TWITTER_ACCESS_TOKEN_SECRET'),
        wait_on_rate_limit=True
    )
    logger.info("Successfully authenticated with Twitter API")
except Exception as e:
    logger.error(f"Error authenticating with Twitter API: {str(e)}")
    raise


In [None]:
# Optimized search queries with targeted keywords and operators
search_queries = [
    '(AI OR "artificial intelligence") (Kenya OR Nairobi) -is:retweet',
    '"digital transformation" (Kenya OR Nairobi) -is:retweet',
    '"machine learning" (Kenya OR Nairobi) -is:retweet',
    '(tech OR technology) (upskilling OR reskilling) (Kenya OR Nairobi) -is:retweet',
    'AI (startup OR innovation) (Kenya OR Nairobi) -is:retweet'
]

# Define key organizations to track
key_organizations = [
    'Safaricom',
    'KCBGroup',
    'EquityBank',
    'iHub',
    'MoringaSchool',
    'Microsoft_EA',
    'Google_Kenya',
    'IBMEastAfrica'
]

def collect_tweets_with_metadata(query, max_results=100):
    """
    Collect tweets with optimized metadata collection and error handling
    """
    tweets = []
    tweet_fields = ['created_at', 'public_metrics', 'context_annotations', 'entities']
    user_fields = ['username', 'public_metrics', 'verified', 'description']
    
    try:
        # Search tweets with pagination
        pagination_token = None
        while len(tweets) < max_results:
            response = client.search_recent_tweets(
                query=query,
                max_results=min(100, max_results - len(tweets)),  # Adjust batch size based on remaining needed
                tweet_fields=tweet_fields,
                user_fields=user_fields,
                expansions=['author_id'],
                next_token=pagination_token
            )
            
            if not response.data:
                break
                
            # Process users lookup
            users = {user.id: user for user in response.includes['users']} if response.includes else {}
            
            for tweet in response.data:
                user = users.get(tweet.author_id, {})
                
                tweet_data = {
                    'created_at': tweet.created_at,
                    'text': tweet.text,
                    'username': user.username if user else None,
                    'user_followers': user.public_metrics['followers_count'] if user else None,
                    'user_verified': user.verified if user else None,
                    'retweet_count': tweet.public_metrics['retweet_count'],
                    'like_count': tweet.public_metrics['like_count'],
                    'reply_count': tweet.public_metrics['reply_count'],
                    'quote_count': tweet.public_metrics['quote_count'],
                    'query': query
                }
                tweets.append(tweet_data)
            
            if not response.meta.get('next_token'):
                break
            
            pagination_token = response.meta['next_token']
            logger.info(f"Collected {len(tweets)} tweets for query: {query}")
            
    except Exception as e:
        logger.error(f"Error collecting tweets for query {query}: {str(e)}")
    
    return tweets

def collect_organization_tweets(org_handle, max_results=50):
    """
    Collect tweets from specific organizations
    """
    try:
        # Get user ID first
        user = client.get_user(username=org_handle)
        if not user.data:
            logger.warning(f"Could not find user {org_handle}")
            return []
            
        user_id = user.data.id
        tweets = []
        
        # Get tweets from the organization
        response = client.get_users_tweets(
            user_id,
            max_results=max_results,
            tweet_fields=['created_at', 'public_metrics'],
            exclude=['retweets', 'replies']
        )
        
        if not response.data:
            return []
            
        for tweet in response.data:
            if any(ai_term.lower() in tweet.text.lower() for ai_term in ['ai', 'artificial intelligence', 'machine learning', 'digital']):
                tweet_data = {
                    'created_at': tweet.created_at,
                    'text': tweet.text,
                    'username': org_handle,
                    'retweet_count': tweet.public_metrics['retweet_count'],
                    'like_count': tweet.public_metrics['like_count'],
                    'reply_count': tweet.public_metrics['reply_count'],
                    'quote_count': tweet.public_metrics['quote_count'],
                    'source': 'organization_timeline'
                }
                tweets.append(tweet_data)
                
        logger.info(f"Collected {len(tweets)} relevant tweets from {org_handle}")
        return tweets
        
    except Exception as e:
        logger.error(f"Error collecting tweets from {org_handle}: {str(e)}")
        return []


In [None]:
# Main data collection function with optimized API usage
def collect_all_data(max_tweets_per_query=50, max_tweets_per_org=30):
    """
    Collect data from both search queries and organization timelines
    with optimized API usage
    """
    all_tweets = []
    
    # First collect from organizations (more targeted data)
    logger.info("Collecting tweets from key organizations...")
    for org in key_organizations:
        org_tweets = collect_organization_tweets(org, max_results=max_tweets_per_org)
        all_tweets.extend(org_tweets)
        logger.info(f"Collected {len(org_tweets)} tweets from {org}")
        
    # Then collect from search queries
    logger.info("Collecting tweets from search queries...")
    for query in search_queries:
        query_tweets = collect_tweets_with_metadata(query, max_results=max_tweets_per_query)
        all_tweets.extend(query_tweets)
        logger.info(f"Collected {len(query_tweets)} tweets for query: {query}")
    
    # Convert to DataFrame and remove duplicates
    df = pd.DataFrame(all_tweets)
    df = df.drop_duplicates(subset=['text'])
    
    # Save to CSV with timestamp
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    filename = f'../data/twitter_data_{timestamp}.csv'
    df.to_csv(filename, index=False)
    logger.info(f"Saved {len(df)} unique tweets to {filename}")
    
    return df

# Execute data collection
if __name__ == "__main__":
    try:
        df = collect_all_data()
        print(f"Successfully collected {len(df)} tweets")
    except Exception as e:
        logger.error(f"Error in main data collection: {str(e)}")


In [None]:
# Define search keywords and tweet collection function
search_queries = [
    'AI Kenya',
    'digital transformation Kenya',
    'machine learning Kenya',
    'future-proof workforce Kenya',
    'AI reskilling Kenya'
]

def collect_tweets(query, max_tweets=100):
    tweets = []
    try:
        for tweet in tweepy.Cursor(api.search_tweets,
                                  q=query,
                                  lang='en',
                                  tweet_mode='extended').items(max_tweets):
            tweet_data = {
                'created_at': tweet.created_at,
                'text': tweet.full_text,
                'username': tweet.user.screen_name,
                'user_followers': tweet.user.followers_count,
                'user_verified': tweet.user.verified,
                'retweet_count': tweet.retweet_count,
                'favorite_count': tweet.favorite_count,
                'query': query
            }
            tweets.append(tweet_data)
    except Exception as e:
        print(f'Error collecting tweets for query {query}: {str(e)}')
    
    return tweets


In [None]:
# Create LinkedIn data collection template
linkedin_columns = [
    'post_date',
    'author_name',
    'author_title',
    'author_company',
    'seniority_level',  # C-level, Director, Manager, Individual Contributor
    'company_size',     # Small (<50), Medium (50-500), Large (>500)
    'post_text',
    'likes',
    'comments',
    'shares'
]

linkedin_df = pd.DataFrame(columns=linkedin_columns)
linkedin_df.to_csv('../data/linkedin_template.csv', index=False)
print("LinkedIn template created at '../data/linkedin_template.csv'")


In [None]:
# Process and store Twitter data
def process_twitter_data():
    all_tweets = []
    for query in search_queries:
        tweets = collect_tweets(query)
        all_tweets.extend(tweets)
    
    # Convert to DataFrame
    df = pd.DataFrame(all_tweets)
    
    # Save to CSV
    df.to_csv('../data/twitter_data.csv', index=False)
    print(f'Saved {len(df)} tweets to data/twitter_data.csv')
    
    return df
