In [1]:
# load libraries
import pandas as pd
from requests_oauthlib import OAuth1Session
import json
import datetime, time, sys
from abc import ABCMeta, abstractmethod

In [2]:
# twitter api
CK = 'YSTRJOf78h4O3Nh3eLe750eXh' # API
CS = 'zXj1XtWrO1DC8xeE9c1LBpq6F7wo5Wp2YGeBU0dHjq6cfdTbjm' # API Secret
AT = '1318425889600802817-xwztBCt7L07WTaffcCXC2N1P5XhVPQ' # Access Token
AS = '1Lgxl4e6fJN1YGJuCcYg1aUvhDSJZ2zyKmJHWnRTSV4tx' # Access Token Secret

## Scrape function/class setting

In [3]:
# Twitter Getter Main Class
class TweetsGetter(object):
    __metaclass__ = ABCMeta
    
    def __init__(self):
        self.session = OAuth1Session(CK, CS, AT, AS) # connect to twitter API
    
    @abstractmethod
    def specifyUrlAndParams(self, keyword):
        '''
        Return URL and Parameters
        '''
    
    @abstractmethod
    def pickupTweet(self, res_text, includeRetweet):
        '''
        pull tweets from res_text, convert to array set and return
        '''
    
    @abstractmethod
    def getLimitContext(self, res_text):
        '''
        get # of limits info when start
        '''
    
    def collect(self, total = -1, onlyText = False, includeRetweet = False):
        '''
        start to get tweets
        '''
        
        # check n of limites
        self.checkLimit()
        
        # URL, parameter
        url, params = self.specifyUrlAndParams()
        # include_rts is paramter for statuses/user_timeline, can't use forsearch/tweets
        params['include_rts'] = str(includeRetweet).lower()
        
        # getting tweets
        cnt = 0
        unavailableCnt = 0
        while True:
            res = self.session.get(url, params = params)
            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)
                
                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue
            
            unavailableCnt = 0
            
            if res.status_code != 200:
                raise Exception('Twitter API erorr %d' % res.status_code)
                
            tweets = self.pickupTweet(json.loads(res.text))
            if len(tweets) == 0:
                # watned len(tweets) !=['count'], but seems count is maximum and can't use for classification
                # so ' == 0'
                # ref : https://dev.twitter.com/discussions/7513
                break
            
            for tweet in tweets:
                if(('retweeted_status' in tweet) and (includeRetweet is False)):
                    pass
                else:
                    if onlyText is True:
                        yield tweet['text']
                    else:
                        yield tweet
                        
                    cnt += 1
                    if cnt % 100 == 0:
                        print('%d ' % cnt)
                    
                    if total > 0 and cnt >= total:
                        return
            
            params['max_id'] = tweet['id'] - 1
            
            # confirm limitation
            # sometimes X-$Rate-Limit-Remaining is not included, so check
            if ('X-Rate-Limit_Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
                if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
                    self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
                    self.checkLimit()
                else:
                    print('not found - X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
                    self.checkLimit()
            
    def checkLimit(self):
        '''
        ask limitation and wait until accessible
        '''
        unavailableCnt = 0
        while True:
            url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
            res = self.session.get(url)

            if res.status_code == 503:
                # 503 : Service Unavailable
                if unavailableCnt > 10:
                    raise Exception('Twitter API error %d' % res.status_code)

                unavailableCnt += 1
                print ('Service Unavailable 503')
                self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
                continue

            unavailableCnt = 0

            if res.status_code != 200:
                raise Exception('Twitter API error %d' % res.status_code)

            remaining, reset = self.getLimitContext(json.loads(res.text))
            if (remaining == 0):
                self.waitUntilReset(reset)
            else:
                break
                
    def waitUntilReset(self, reset):
        '''
        sleep until reset time
        '''
        seconds = reset - time.mktime(datetime.datetime.now().timetuple())
        seconds = max(seconds, 0)
        print ('\n     =====================')
        print ('     == Exceeding Rate Limit for the API endpoint ==' % seconds)
        print ('     == waiting %d sec ==' % seconds)
        print ('     =====================')
        sys.stdout.flush()
        time.sleep(seconds + 10)  # add +10 sec just in case
    
    @staticmethod
    def bySearch(keyword):
        return TweetsGetterBySearch(keyword)
    
    @staticmethod
    def byUser(screen_name):
        return TweetsGetterByUser(screen_name)

In [4]:
class TweetsGetterBySearch(TweetsGetter):
    '''
    get tweets by keyward
    '''
    def __init__(self, keyword):
        super(TweetsGetterBySearch, self).__init__()
        self.keyword = keyword
    
    def specifyUrlAndParams(self):
        '''
        return URL and parameter
        '''
        url = 'https://api.twitter.com/1.1/search/tweets.json'
        params = {'q':self.keyword, 'count':100}
        return url, params
    
    def pickupTweet(self, res_text):
        '''
        pull tweets from res_text and convert to array and return
        '''
        results = []
        for tweet in res_text['statuses']:
            results.append(tweet)
        
        return results
    
    def getLimitContext(self, res_text):
        '''
        get limitation info when start
        '''
        remaining = res_text['resources']['search']['/search/tweets']['remaining']
        reset     = res_text['resources']['search']['/search/tweets']['reset']
        
        return int(remaining), int(reset)

In [13]:
class TweetsGetterByUser(TweetsGetter):
    '''
    get tweets by selecting user. not workign now, needs overhaul
    '''
    def __init__(self, screen_name):
        super(TweetsGetterByUser, self).__init__()
        self.screen_name = screen_name
    
    def specifyUrlAndParams(self):
        '''
        pull tweets from res_text and convert array and return
        '''
        results = []
        for tweet in res_text:
            results.append(tweet)
        
        return results
    
    def getLimitContext(self, res_text):
        '''
        get limitation info when start
        '''
        remaining = res_text['resources']['statuses']['/statuses/user_timeline']['remaining']
        reset     = res_text['resources']['statuses']['/statuses/user_timeline']['reset']

        return int(remaining), int(reset)

In [14]:
if __name__ == '__main__':
    
    # get tweets by keyword
    getter = TweetsGetter.bySearch(u'datascience')
    
    # get tweets by user (screen_name)
    #getter = TweetsGetter.byUser('@realDonaldTrump')
    
    list_text = []
    list_id = []
    list_user_screenname = []
    list_created_at = []
    
    for tweet in getter.collect(total = 10): # total is number of tweets to get
        list_text.append(tweet['text'])
        list_id.append(tweet['id'])
        list_user_screenname.append(tweet['user']['screen_name'])
        list_created_at.append(tweet['created_at'])

## Output scraping results to dataframe
There is a rate limitation for each API endpoint twitter provided. My API endpoint limites around 5000/hr

In [15]:
df = pd.DataFrame(columns = ['text', 'id', 'user', 'created_at'])
df_new = df.assign(text=list_text, id=list_id, user=list_user_screenname, created_at=list_created_at)

print(df_new)

                                                text                   id  \
0  #AI - build it or buy it? https://t.co/n4DzGIp...  1387440722425434114   
1  Congrats to the coolest data science and machi...  1387440219272716290   
2  Tu es data scientist ou développeur backend ? ...  1387440081582137350   
3  Business Standard declares #HCL as the Company...  1387439753231020036   
4  Click the #transcription option to get all you...  1387439726618087427   
5  Adi Tatarko Honey Pot Befez \n#best #Custom #H...  1387439720532226051   
6  RSNA: Researchers Use Artificial Intelligence ...  1387439712009400320   
7  We just completed ses. #4 of the @creativedlab...  1387439670003372032   
8  VB &gt; Gartner says low-code, RPA, and AI dri...  1387439571164610563   
9  Keep up-to-date with the latest oncology, rat ...  1387438984150888454   

              user                      created_at  
0       tweetgrady  Wed Apr 28 16:17:00 +0000 2021  
1              CRN  Wed Apr 28 16:15:00 +0000 

## Apply pos/neg classification : Manual

In [16]:
# load NLP libraries
from flair.models import TextClassifier
from flair.data import Sentence

global tagger

In [17]:
def load_flair():
	return TextClassifier.load('en-sentiment')

tagger = load_flair()

2021-04-28 09:18:52,077 loading file /Users/tak/.flair/models/sentiment-en-mix-distillbert_4.pt


In [19]:
# quick manual test for flair NLP
s = Sentence('I love tokyo')
tagger.predict(s)
s

Sentence: "I love tokyo"   [− Tokens: 3  − Sentence-Labels: {'label': [POSITIVE (0.9945)]}]

In [39]:
# quick manual test for flair NLP apply for data frame
s = Sentence(df_new['text'][0])
tagger.predict(s)
s

Sentence: "# AI - build it or buy it ? https :// t.co / n4DzGIpHyG # DataScience # ML"   [− Tokens: 18  − Sentence-Labels: {'label': [POSITIVE (0.8868)]}]

In [48]:
s.labels[0].score

0.8867866396903992

## Apply pos/neg classification : Automate

In [26]:
# initialize empty dataframe
tweet_data = pd.DataFrame({
    'tweet': [],
    'predicted-sentiment-value': []
    'predicted-sentiment-score': []
})

In [27]:
# keep track of positive vs negative tweets
pos_vs_neg = {'POSITIVE':0, 'NEGATIVE': 0}

In [56]:
# add data for each tweet
for tweet in list_text:
    # skip iteration if tweet is empty
    if tweet in ('',' '):
        continue
    # make predictions
    sentence = Sentence(tweet)
    tagger.predict(sentence)
    # keep track of positive vs negative tweets
    pos_vs_neg[sentence.labels[0].value] += 1 #value is either POSITIVE or NEGATIVE
    # append new data
    tweet_data = tweet_data.append({'tweet': tweet,
                                    'predicted-sentiment-value': sentence.labels[0].value,
                                   'predicted-sentiment-score': sentence.labels[0].score}, ignore_index=True)

In [57]:
# see prediction against each tweets
tweet_data

Unnamed: 0,tweet,predicted-sentiment,predicted-sentiment-score,predicted-sentiment-value
0,#AI - build it or buy it? https://t.co/n4DzGIp...,0.886787,,
1,Congrats to the coolest data science and machi...,0.977923,,
2,Tu es data scientist ou développeur backend ? ...,0.925735,,
3,Business Standard declares #HCL as the Company...,0.992099,,
4,Click the #transcription option to get all you...,0.99997,,
5,Adi Tatarko Honey Pot Befez \n#best #Custom #H...,0.994835,,
6,RSNA: Researchers Use Artificial Intelligence ...,0.87848,,
7,We just completed ses. #4 of the @creativedlab...,0.982536,,
8,"VB &gt; Gartner says low-code, RPA, and AI dri...",0.571626,,
9,"Keep up-to-date with the latest oncology, rat ...",0.959759,,


In [58]:
# see pos_vs_neg
pos_vs_neg

{'POSITIVE': 19, 'NEGATIVE': 2}

In [66]:
# get positive rate
pos_rate = round(pos_vs_neg['POSITIVE']/(pos_vs_neg['NEGATIVE']+pos_vs_neg['POSITIVE'])*100,1)
pos_rate

90.5