# 01_developing_tweepy_strategy
In order to stream in relevant tweets for the world cup, we need to figure out how we can: 
- enable streaming using V2 with tweepy
- figure out how we can get the correct fields we need from tweepy
- figure out how we can efficiently write this out. one option to investigate could be: mongoDB. but, we will focus on writing this out to json rather than mongodb. 

NL, 23/11/22  
NL, 25/11/22 -- fleshing out, coming up with proper config of our streaming client

## IMPORTS

In [62]:
import os
from dotenv import load_dotenv
import json
import re
import datetime
import tweepy

## FUNCTIONS

In [63]:
def check_path_exists(filepath:str):
    '''
    checks if a supplied filepath 
    (dir + filename) exists. if the full path
    with file is not a file, checks for 
    just the dir path and creates file if exists, 
    raises error if not
    
    args:
        - filepath: str, full file path
    '''
    if filepath is None:
        raise TypeError(f'specified path is None.')

    if not isinstance(filepath, str):
        raise TypeError(f'filepath object must be\
            str. Please re-specify.')

    if os.path.isdir(filepath):
        raise ValueError(f'Need to provide a full path to a file,\
            not a dir.')

    if not os.path.isfile(filepath):
        # split and check if everything before the last 
        # `/` is a dir
        splits = filepath.split('/')
        concat = '/'.join(splits[:-1])+'/'
        if not os.path.isdir(concat):
            raise NotADirectoryError(f'Directory path \
                {concat} does not exist. Please re-specify\
                    `out_path`.')
        else:
            print(f'{concat}, the dir in for\
                specified filepath is a directory, but\
                    file {splits[-1:]} does not exist.\
                        Thats fine for us.')
            return filepath
    
    else: 
        return filepath

In [210]:
def extract_count_domains_entities(context_field:list):
    '''
    entracts the counts of domains and entities in 
    a given tweet.

    returns:
        - domains, dict
        - entities, dict
    '''
    domains = {}
    entities = {}

    for context in context_field:
        # domain
        if context['domain']['name'] not in domains.keys():
            domains[context['domain']['name']] = 1
        else:
            domains[context['domain']['name']] += 1

        # entity
        if context['entity']['name'] not in entities.keys():
            entities[context['entity']['name']] = 1
        else:
            entities[context['entity']['name']] += 1

    return domains, entities

In [297]:
def total_domain_entity_counts(domains_tweet:dict,
                               domains_session:dict,
                               entities_tweet:dict,
                               entities_session:dict):
    ''' 
    accumulates counts for domains and entities
    for the entire streaming session.  
    '''
    for domain in domains_tweet.keys():
        if domain not in domains_session.keys():
            domains_session[domain] = 1
        else:
            domains_session[domain] += 1

    for entity in entities_tweet.keys():
        if entity not in entities_session.keys():
            entities_session[entity] = 1
        else:
            entities_session[entity] += 1

    return domains_session, entities_session

In [241]:
def extract_urls(tweet_text:str) -> list:
    '''
    extracts urls from tweet text
    returns all urls in tweet text in list
    '''
    urls = re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", tweet_text)

    return urls

## INIT

In [64]:
load_dotenv()

True

## PATHS & CONSTANTS

v1

In [65]:
# consumer_key = os.getenv('TWITTER_API_KEY')
# consumer_secret = os.getenv('TWITTER_API_KEY_SECRET')
# access_token = os.getenv('TWITTER_ACCESS_TOKEN')
# access_token_secret = os.getenv('TWITTER_ACCESS_TOKEN_SECRET')

v2

In [66]:
bearer_token = os.getenv('TWITTER_BEARER_TOKEN')

fields to return

In [226]:
expansions = ['author_id', 'referenced_tweets.id']
tweet_fields = ['created_at', 'public_metrics', 'source', 'context_annotations']
media_fields = ['media_key', 'type', 'url', 'duration_ms']
user_fields = ['id', 'name', 'username', 'created_at', 'description', 'location', 'public_metrics']

In [298]:
class TweetStreamer(tweepy.StreamingClient):

    def __init__(self, 
                 out_path:str,
                 out_path_domains:str,
                 out_path_entities:str,
                 kill_time:int=59,
                 time_unit:str='minutes',
                 **kwargs):
        '''
        adding custom params
        '''
        # out path for our tweet json
        out_path = check_path_exists(out_path)
        self.outfile = out_path 

        # out path for our domains/entities jsons
        out_path_domains = check_path_exists(out_path_domains)
        out_path_entities = check_path_exists(out_path_entities)
        self.out_path_domains = out_path_domains
        self.out_path_entities = out_path_entities

        # timing stuff
        self.start_time = datetime.datetime.now()

        if time_unit not in ['seconds', 'minutes']:
            raise ValueError(f'time_unit must be either `minutes` or `seconds`.')

        if time_unit=='minutes':
            self.kill_time = datetime.timedelta(seconds=60*kill_time)
        else:
            self.kill_time = datetime.timedelta(seconds=kill_time)

        self.domains = {}
        self.entities = {}

        # using super here makes sure we get all the attributes
        # from our super-class. we do have to pass **kwargs both in
        # the init method and here for this to work.
        super(TweetStreamer, self).__init__(**kwargs)
    
    def on_data(self, data):
        '''
        1. clean the returned tweet object
        2. write it out
        '''
        # pull core fields we want
        if (datetime.datetime.now() - self.start_time) <= self.kill_time:

            obj = json.loads(data)
            tweet = obj['data']
            del tweet['edit_history_tweet_ids']

            urls = extract_urls(tweet['text'])
            if len(urls)>0:
                tweet['urls'] = urls

            if 'context_annotations' in tweet.keys():
                domains, entities = extract_count_domains_entities(tweet['context_annotations'])
                self.domains, self.entities = total_domain_entity_counts(domains_tweet=domains,
                                                                         domains_session=self.domains,
                                                                         entities_tweet=entities,
                                                                         entities_session=self.entities)
                
                # for domain in domains.keys():
                #     if domain not in self.domains.keys():
                #         self.domains[domain] = 1
                #     else:
                #         self.domains[domain] += 1

                # for entity in entities.keys():
                #     if entity not in self.entities.keys():
                #         self.entities[entity] = 1
                #     else:
                #         self.entities[entity] += 1
                
                del tweet['context_annotations']
                tweet['domains'] = domains
                tweet['entities'] = entities

            tweet['user'] = obj['includes']['users'][0]
            
            with open(self.outfile, 'a') as o:
                o.write(json.dumps(tweet)+'\n')

        # below is the thing that's done
        # once we hit the time limit
        else:
            # write out our domain and entity counts
            self.domains = dict(sorted(self.domains.items(), key=lambda x:x[1], reverse=True))
            self.entities = dict(sorted(self.entities.items(), key=lambda x:x[1], reverse=True))

            with open(self.out_path_domains, 'w') as o:
                o.write(json.dumps(self.domains))

            with open(self.out_path_entities, 'w') as o:
                o.write(json.dumps(self.entities))
            
            # this kills the streaming process
            self.disconnect()
            return False


    def on_errors(self, errors):
        return super().on_errors(errors)

search term

In [301]:
streamer = TweetStreamer(bearer_token=bearer_token, 
                         out_path='test.json', 
                         out_path_domains='test_domains.json', 
                         out_path_entities='test_entities.json',
                         kill_time=30,
                         time_unit='seconds')

In [302]:
streamer.filter(
    expansions=expansions, 
    tweet_fields=tweet_fields,
    media_fields=media_fields,
    user_fields=user_fields
    )

Stream connection closed by Twitter


In [270]:
streamer.add_rules(tweepy.StreamRule('gakpo'))

Response(data=None, includes={}, errors=[{'value': 'gakpo', 'id': '1596186239585091585', 'title': 'DuplicateRule', 'type': 'https://api.twitter.com/2/problems/duplicate-rules'}], meta={'sent': '2022-11-26T01:39:57.762Z', 'summary': {'created': 0, 'not_created': 1, 'valid': 0, 'invalid': 1}})

trying to read in tweets just collected

In [232]:
tweets = []

with open('test.json', 'r') as infile:
    for line in infile:
        tweets.append(json.loads(line))

In [237]:
import re

In [238]:
for tweet in tweets:
    urls = re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", tweet['text'])
    print(urls)

[]
[]
[]
[]
[]
['https://t.co/ZPAvtQGg4c']
[]
[]
['https://t.co/rliImuBHdH']
[]
['https://t.co/RJuzctV88l']
['https://t.co/mVxWYkx58k']
['https://t.co/o7wYgjz4vF']
[]
[]
[]
['https://t.co/GgxC8XMKCF']
[]
[]
[]
[]
[]
['https://t.co/onYErP7wvR']
['https://t.co/s9L0ArwoYk']
[]
[]
[]
['https://t.co/EWIfj3hIW5']
[]


In [239]:
test = 'this is a piece of text with 2 urls: https://t.co/GgxC8XMKCF and https://t.co/EWIfj3hIW5' 

In [240]:
re.findall("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", test)

['https://t.co/GgxC8XMKCF', 'https://t.co/EWIfj3hIW5']

In [37]:
tweets[0]['data']

{'author_id': '602101529',
 'created_at': '2022-11-25T17:19:25.000Z',
 'edit_history_tweet_ids': ['1596191840675717122'],
 'id': '1596191840675717122',
 'public_metrics': {'retweet_count': 1,
  'reply_count': 0,
  'like_count': 0,
  'quote_count': 0},
 'referenced_tweets': [{'type': 'retweeted', 'id': '1596191759746224129'}],
 'source': 'Twitter for Android',
 'text': 'RT @pbtips_: Gakpo dey defence and Attack'}

In [58]:
tweets[0]['data']

{'author_id': '602101529',
 'created_at': '2022-11-25T17:19:25.000Z',
 'edit_history_tweet_ids': ['1596191840675717122'],
 'id': '1596191840675717122',
 'public_metrics': {'retweet_count': 1,
  'reply_count': 0,
  'like_count': 0,
  'quote_count': 0},
 'referenced_tweets': [{'type': 'retweeted', 'id': '1596191759746224129'}],
 'source': 'Twitter for Android',
 'text': 'RT @pbtips_: Gakpo dey defence and Attack'}

In [36]:
tweets[0]['includes']

{'users': [{'created_at': '2012-06-07T18:42:55.000Z',
   'description': '★Free Sports Betting Tips  ★ Blogs on @eldanaija ★ Redesign your blog ★ DM for ADs & Promotion ★ Aspiring Video animator ★\n📩 touchedbyelda@gmail.com',
   'id': '602101529',
   'location': 'Nigeria',
   'name': 'Obinna | #ObiDatti2023💙',
   'public_metrics': {'followers_count': 10431,
    'following_count': 9141,
    'tweet_count': 391228,
    'listed_count': 12},
   'username': 'touchedByElda'},
  {'created_at': '2018-09-01T21:19:48.000Z',
   'description': 'FOOTBALL/BASKETBALL TIPSTER 📩Professionalbetstip@gmail.com Please gamble responsibly! 18+ https://t.co/Kdf976xDDj',
   'id': '1036000695533690880',
   'location': 'Africa',
   'name': 'PROFESSIONAL BETS',
   'public_metrics': {'followers_count': 164432,
    'following_count': 1324,
    'tweet_count': 78425,
    'listed_count': 380},
   'username': 'pbtips_'}],
 'tweets': [{'author_id': '602101529',
   'created_at': '2022-11-25T17:19:25.000Z',
   'edit_history

In [133]:
streamer.tweets[0]

b'{"data":{"author_id":"848043269103558660","created_at":"2022-11-25T22:45:10.000Z","edit_history_tweet_ids":["1596273819580518400"],"id":"1596273819580518400","public_metrics":{"retweet_count":292,"reply_count":0,"like_count":0,"quote_count":0},"referenced_tweets":[{"type":"retweeted","id":"1596184708177694720"}],"source":"Twitter for iPhone","text":"RT @FIFAWorldCup: Gakpo\'s strike gives #NED the advantage at the break. \\n\\n#FIFAWorldCup | #Qatar2022"},"includes":{"users":[{"created_at":"2017-04-01T05:24:06.000Z","description":"We Are X\xef\xbc\x81I love X forever\xef\xbc\x81\xef\xbc\x81 X=\xe7\x84\xa1\xe9\x99\x90\xe3\x81\xae\xe5\x8f\xaf\xe8\x83\xbd\xe6\x80\xa7\xe3\x82\x92\xe6\x84\x8f\xe8\xad\x98\xe3\x81\x97\xe3\x81\xa6\xe8\xb5\xb0\xe3\x82\x8b\xe7\xa4\xbe\xe4\xbc\x9a\xe4\xba\xba\xe3\x80\x82\xe5\x85\x83\xe7\xa7\x8b\xe8\x91\x89\xe4\xba\xba\xe3\x80\x82","id":"848043269103558660","location":"X","name":"YOSHIKI2\xe4\xb8\x96","public_metrics":{"followers_count":169,"following_count":451

below is some code which streams tweets and automatically writes them to a mongodb. this is something we might want to pursue. 

In [None]:
# ===============================================
# twitter-to-mongo.py v1.0 Created by Nacho Gaona
# ===============================================
import pymongo
import json
from tweepy.streaming import Stream
from tweepy import OAuthHandler
import datetime
import tweepy


# The MongoDB connection info. This assumes your database name is TwitterStream, and your collection name is tweets.
connection =  pymongo.MongoClient('localhost', 27017)
db = connection.TwitterStream  #youshould have a DB created with name TwitterStream


collection = db.tweets      #youshould have a collection created with name tweets


# Optional - Only grab tweets of specific language
language = ['en']

# You need to replace these with your own values that you get after creating an app on Twitter's developer portal.
consumer_key = "XX"
consumer_secret = "XX"
access_token = "XX-XX"
access_token_secret = "XX"



# The below code will get Tweets from the stream and store only the important fields to your database
class MyStreamListener(Stream):

    def on_data(self, data):

        # Load the Tweet into the variable "t"
        t = json.loads(data)

        # Pull important data from the tweet to store in the database.
        tweet_id = t['id_str']  # The Tweet ID from Twitter in string format
        username = t['user']['screen_name']  # The username of the Tweet author
        followers = t['user']['followers_count']  # The number of followers the Tweet author has
        text = t['text']  # The entire body of the Tweet
        hashtags = t['entities']['hashtags']  # Any hashtags used in the Tweet
        dt = t['created_at']  # The timestamp of when the Tweet was created
        language = t['lang']  # The language of the Tweet

        # Convert the timestamp string given by Twitter to a date object called "created". This is more easily manipulated in MongoDB.
        created = datetime.datetime.strptime(dt, '%a %b %d %H:%M:%S +0000 %Y')

        # Load all of the extracted Tweet data into the variable "tweet" that will be stored into the database
        tweet = {'id':tweet_id, 'username':username, 'followers':followers, 'text':text, 'hashtags':hashtags, 'language':language, 'created':created}

        # Save the refined Tweet data to MongoDB
        collection.insert_one(tweet)

        # Optional - Print the username and text of each Tweet to your console in realtime as they are pulled from the stream
        print(username + ':' + ' ' + text)
        return True

    # Prints the reason for an error to your console
    def on_error(self, status):
        print (status)

# Some Tweepy code that can be left alone. It pulls from variables at the top of the script
if __name__ == '__main__':


  
    myStreamListener_new = MyStreamListener(consumer_key,consumer_secret,access_token,access_token_secret)
    myStreamListener_new.filter(track=['Pique'])



    ###########################
    #in case you want check updates from twitter timeline
    #auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    #auth.set_access_token(access_token, access_token_secret)

    #api = tweepy.API(auth)
    #public_tweets = api.home_timeline()
    #for tweete in public_tweets:  #this is another example to check the timeline from twits
    #    print (tweete.text)