In [None]:
def get_tweets(query, max_results):
  """
    Fetches recent tweets from the Twitter API based on a given query.

    Parameters:
        query (str): The query string to search for tweets.
        max_results (int): The maximum number of tweets to fetch.

    Returns:
        List of tweets (objects): A list of fetched tweets matching the query.

    Raises:
        Exception: If there is an error while fetching tweets.
  """
  expansions    = ['author_id','in_reply_to_user_id','geo.place_id','entities.mentions.username','referenced_tweets.id','referenced_tweets.id.author_id']
  tweet_fields  = ['id','text','author_id','attachments','context_annotations','created_at','entities','lang','geo','public_metrics']
  user_fields   = ['id','name','username','created_at','description','entities','location','public_metrics','verified']
  place_fields  = ['full_name','id','country','country_code','geo','name','place_type']
  try:
    # call twitter api to fetch tweets
    fetched_tweets = tweepy.Paginator(client.search_recent_tweets, query=query,
      expansions        =expansions,
      tweet_fields      =tweet_fields,
      place_fields      =place_fields,
      user_fields       =user_fields,   
      max_results       =max_results
    ).flatten()
    
    return fetched_tweets
    

  except Exception as e:
    print("Error getting tweets", e)

In [None]:
domain_df = pd.read_csv('twitter-context-annotations/files/evergreen-context-entities-20220601.csv')
import itertools
def automate_domain_filter(df, start_id, end_id, chunk_size, domain_chunk_count):
    """
    Automates the process of creating chunks of query strings for domain filtering based on the provided DataFrame.

    Parameters:
        df (DataFrame): The DataFrame containing the domain information.
        start_id (int): The starting domain ID.
        end_id (int): The ending domain ID.
        chunk_size (int): The maximum size of each chunk in characters.
        domain_chunk_count (dict): A dictionary specifying the number of chunks for each domain ID.

    Returns:
        List of chunks (list): A list of query string chunks.
    """
    chunks_list = []
    for i in range(start_id, end_id+1):
        context_list = []
        mask = df['domains'].str.contains('^{}$'.format(i))
        filtered_df = df[mask]
        for index, row in filtered_df.iterrows():
            domain_id = row['domains']
            entity_id = row['entity_id']
            entity_name = row['entity_name']   
            # construct the query string
            context = f'context:{domain_id}.{entity_id}'
            context_list.append(context)
            context_query = ' OR '.join(context_list)
        code = context_query
        chunks = []
        start = 0
        counter = 0
        while start < len(code) and counter < domain_chunk_count[i]:
            end = start + chunk_size
            if end >= len(code):
                end = len(code)
            end = code.rfind(" OR ", start, end)
            if end == -1:
                end = start + chunk_size
            chunk = code[start:end]
            if chunk.startswith(" OR "):
                chunk = chunk[4:]
            chunks.append(chunk)
            start = end
            counter += 1
        chunks_list.append(chunks)
    return list(itertools.chain.from_iterable(chunks_list))

chunk_size = 350
domain_chunk_count = {45: 1, 46: 6, 47: 276, 48: 69}
chunks_list = automate_domain_filter(domain_df, 45, 48, chunk_size, domain_chunk_count)

In [None]:
def filter_rule(chunk, hash_include=True):
    """
    Creates a query string for filtering tweets based on the provided chunk.

    Parameters:
        chunk (str): The chunk representing a portion of the query string.
        hash_include (bool): Flag indicating whether to include hashtag-related filter rules. Default is True.

    Returns:
        query (str): The constructed query string.
    """
    text_list       = '(#ad OR #sponsored OR #promoted OR "Learn More" OR "Shop Now")'
    lang            = '(lang:en)'
    rt              = '(-is:retweet) (-"RT")' 
    domain            = chunk
    mention         = 'has:mentions'
    if hash_include == True:
        query           = text_list + ' ' + lang + ' ' + rt + ' ' + mention + ' ' + '(' + domain + ')'
    else: 
        query           = lang + ' ' + rt + ' ' + mention + ' ' + '(' + domain + ')'

    return query

In [None]:
import time
import tweepy
import pandas as pd

processed_tweets = 0
total_tweets = 0

tweet_data = []  # List to store tweet data

for chunk in chunks_list:
    print(f'Chunk: {chunk}')
    
    query = filter_rule(chunk=chunk,hash_include=True)
    paginator = get_tweets(query=query, max_results=100)

    if paginator is None:
        print('Error: Paginator is None. Skipping chunk.')
        continue
    
    for tweet in paginator:
        tweet_info = {
                'tweet_id': tweet.id,
                'author_id': tweet.author_id,
                'created_at': tweet.created_at,
                'text': tweet.text,
                'tweet_metrics': json.dumps(tweet.public_metrics),
                'entities': json.dumps(tweet.entities),
                'context': json.dumps(tweet.context_annotations),
                'place_id': json.dumps(tweet.geo) if tweet.geo else None
            }
            
        tweet_data.append(tweet_info)
        processed_tweets += 1
    
    
    print(f'Finished processing chunk: {chunk}')
    print(f'Progress: {processed_tweets} tweets processed.')
    time.sleep(3)  # Pause for 5 minutes between chunks to avoid hitting rate limits


In [None]:
# Convert tweet_data list to a DataFrame
df = pd.DataFrame(tweet_data)

# Sort the DataFrame by 'tweet_id' in descending order
df.sort_values('tweet_id', ascending=True, inplace=True)

# Drop duplicate rows based on 'tweet_id' column, keeping the last occurrence
dedup_df = df.drop_duplicates(subset='tweet_id', keep='last', inplace=False).reset_index(drop=True, inplace=False)

print(dedup_df.shape)

In [None]:
def process_tweets_to_database(df, conn, c):
    """
    Processes tweets from a DataFrame and inserts or updates them in a SQLite database.

    Args:
        df (pandas.DataFrame): DataFrame containing tweet data.
        conn (sqlite3.Connection): SQLite database connection.
        c (sqlite3.Cursor): SQLite database cursor.

    Returns:
        int: Number of tweets processed.

    """
    processed_tweets = 0

    for index, tweet in df.iterrows():
        try:
            created_at = tweet['created_at'].strftime('%Y-%m-%d %H:%M:%S')
            # Check if tweet with the same tweet_id already exists in the database
            c.execute('SELECT tweet_id FROM tweets WHERE tweet_id=?', (tweet['tweet_id'],))
            existing_tweet_id = c.fetchone()

            if existing_tweet_id is None:
                # Tweet doesn't exist in the database, insert it
                c.execute('''INSERT INTO tweets 
                             (tweet_id, author_id, created_at, text, tweet_metrics, entities, context, place_id) 
                             VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
                          (tweet['tweet_id'], tweet['author_id'], created_at, tweet['text'],
                           tweet['tweet_metrics'], tweet['entities'], tweet['context'], tweet['place_id']))
                print("New Tweet Appended")
            else:
                # Tweet already exists, update tweet_metrics
                c.execute('''UPDATE tweets 
                             SET tweet_metrics = ? 
                             WHERE tweet_id = ?''',
                          (tweet['tweet_metrics'], tweet['tweet_id']))
                print("Tweet Already Exists, Updating Tweet Metrics")
            processed_tweets += 1

            print(f'Progress: {processed_tweets} tweets processed.')

        except Exception as e:
            print(f"Error inserting row: {tweet}")
            print(f"Error message: {e}")

    # Commit the changes and close the connection
    conn.commit()

    return processed_tweets

process_tweets_to_database(df=dedup_df, conn=conn, c=c)


In [None]:
# Get unique author IDs from the tweets table
c.execute("SELECT DISTINCT author_id FROM tweets")
author_ids = [str(row[0]) for row in c.fetchall()]

user_data = []
batch_size = 100
n = 0

# Iterate over batches of author IDs
for i in range(0, len(author_ids), batch_size):  
    # Get a batch of user IDs
    user_ids_batch = author_ids[i:i+batch_size]    
    try:
        users = t.users.lookup(user_id=",".join(user_ids_batch))
        
    # Insert or update the user data in the database
        for user in users:
            # Check if author already exists in the database
            c.execute("SELECT author_id FROM users WHERE author_id=?",  (user['id_str'],))
            existing_author_id = c.fetchone()

            if existing_author_id is None:
                # Author doesn't exist in the database, insert a new row
                author_created = user['created_at']
                c.execute('''INSERT INTO users (author_id, username, verified, bio, author_created, author_location, 
                            followers_count, following_count, tweet_count, entities)
                            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                            (user['id_str'], user['screen_name'], user['verified'], user['description'], author_created,
                            user['location'], user['followers_count'], user['friends_count'], user['statuses_count'],
                           json.dumps(user['entities'])))
                print(f"Stored author: {user['name']} (@{user['screen_name']}), id={user['id_str']}")
            else:
                # Author already exists in the database, update the existing row
                author_created = user['created_at']
                c.execute('''UPDATE users SET username=?, verified=?, bio=?, author_created=?, author_location=?, 
                            followers_count=?, following_count=?, tweet_count=?, entities=?
                            WHERE author_id=?''',
                            (user['screen_name'], user['verified'], user['description'], author_created,
                            user['location'], user['followers_count'], user['friends_count'], user['statuses_count'],
                           json.dumps(user['entities']), user['id_str']))
                print(f"Updated author: {user['name']} (@{user['screen_name']}), id={user['id_str']}")
        
    except Exception as e:
        print(f"Error retrieving user data: {str(e)}")
    time.sleep(10)
    conn.commit()

In [None]:
c.execute("PRAGMA table_info(tweets)")
columns = c.fetchall()

for column in columns:
    print(column[1], "-", column[2])

c.execute("SELECT COUNT(DISTINCT tweet_id) FROM tweets")
row_count = c.fetchone()[0]
print(f"Number of rows in 'tweets' table: {row_count}\n")

print("\n")

c.execute("PRAGMA table_info(users)")
columns = c.fetchall()

for column in columns:
    print(column[1], "-", column[2])

c.execute("SELECT COUNT(DISTINCT author_id) FROM users")
row_count = c.fetchone()[0]
print(f"Number of rows in 'users' table: {row_count}\n")