In [None]:
import time
import datetime 
import re
from loguru import logger
import pickle
from collections import namedtuple
import pandas as pd
import numpy as np
import tweepy
from tweepy import Unauthorized, NotFound, Forbidden

In [None]:
from AppCred import CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True)

In [None]:
# Extended class that contains both user and tweets in a single object
ExtendedUser = namedtuple('extended_user', ['user', 'tweets']) 

def eligible_user(user):
    """Determines whether a user lives up to the specified criteria."""
    if user.followers_count >= 100:
        try:
            latest_tweet = user.status
        except AttributeError:
            return False
        if latest_tweet.created_at.date() >= datetime.date(2022, 4, 5): ## Change this!
            return True
        else:
            return False
    else:
        return False

def remove_punctuation(text):
    return re.sub('[,.!?;]+', '', text)
    
def eligible_tweet(tweet, EU_words, nuclear_words, nuclear_war_regex): 
    """Determines whether a tweet is about our topic."""
    tweet_text_clean = remove_punctuation( tweet.full_text.lower() )
    tweet_words = set(re.sub(nuclear_war_regex, '', tweet_text_clean).split())
    EU_count = len(tweet_words & EU_words)
    nuclear_count = len(tweet_words & nuclear_words)
    if EU_count >= 1 and nuclear_count >= 1:
        return True
    else:
        return False

def collect_tweets(user_name, EU_words, nuclear_words, nuclear_war_regex):
    """Collects and filters tweets from Twitter user."""
    try:
        tweets = [tweet for tweet in tweepy.Cursor(api.user_timeline, 
                                                   screen_name=user_name,
                                                   include_rts=True, 
                                                   tweet_mode="extended").items(3200)
                 ]
        tweets = [tweet for tweet in tweets if eligible_tweet(tweet, EU_words, nuclear_words, nuclear_war_regex)]
    except Unauthorized:
        tweets = []
    return tweets

def snowball(tweets):
    """Snowball samples from user's tweets."""
    users_mentioned = {user['screen_name'] for tweet in tweets \
                       for user in tweet.entities['user_mentions']}
    users_retweeted = [re.search('^RT @\w+', tweet.full_text) for tweet in tweets]
    users_retweeted = {user.group(0)[4:] for user in users_retweeted if user}
    return {*users_mentioned, *users_retweeted} 

In [None]:
EU_words = {'eu', 'europe', 'european', 'mep', '#eu', '#europe', '#mep', '#eutaxonomy', '#notourtaxonomy'}
nuclear_words = {'nuclear', 'reactor', 'reactors', 'radioactive', 'uranium' 'atomic', '$URA', '$URNM', '#nuclear', '#uranium', '#atomic'}
nuclear_war_regex = 'nuclear war|nuclear arsenal|nuclear threats?|nuclear weapons?|nuclear warheads?|nuclear missiles?|nuclear bombs?|atomic bombs?|atomic threats?'

In [None]:
with open('../Data/actor_list.txt', 'r') as file:
    user_names = {line.rstrip() for line in file}

In [None]:
extended_users = []
not_collected = []
snowball_range = 7

for i in range(snowball_range+1):
    logger.debug(f"Iteration {i+1}: Collecting user info and tweets from {len(user_names)} users")
    new_user_names = set()
    for user_name in user_names:
        time.sleep(20)
        for attempt in range(1, 3+1):
            try:
                user = api.get_user(screen_name=user_name)
                if eligible_user(user):
                    tweets = collect_tweets(user_name, EU_words, nuclear_words, nuclear_war_regex)
                    extended_users.append(ExtendedUser(user, tweets))
                    pickle.dump( [(user, tweets) for user, tweets in extended_users], open("../Data/extended_users.p", "wb") )
                    if tweets and i < snowball_range:
                        new_user_names |= snowball(tweets)
                    logger.debug(f"Collected user info and {len(tweets)} tweets from {user_name}")
                else:
                    logger.debug(f"{user_name} not eligible")
                break
            except Exception as ex:
                ex_name = type(ex).__name__
                if ex_name in ('NotFound', 'Forbidden'):
                    logger.debug(f"{ex_name}! Did not manage to collect user info and tweets from {user_name}")
                    not_collected.append( ( user_name, ex_name, 'Iteration '+str(i+1) ) )
                    break
                else:
                    logger.debug(f"{ex_name}! Did not manage to collect user info and tweets from {user_name} in attempt {attempt}")
                    if attempt == 3:
                        not_collected.append( ( user_name, ex_name, 'Iteration '+str(i+1) ) )
                    time.sleep(30)
    user_names = new_user_names - {eu.user.screen_name for eu in extended_users}
logger.debug(
    f"Collected {sum(len(eu.tweets) for eu in extended_users)} total tweets from {len(extended_users)} users"
    )

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=83a05b10-68f2-47cd-90b2-acf816447dd2' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>