In [1]:
import os
import io
import csv
import json
import time

from pprint import pprint as pp
from collections import namedtuple
from configparser import ConfigParser

In [2]:
#getting secrets from config.ini file
config = ConfigParser()
config.read('./cfg/config.ini')

['./cfg/config.ini']

In [3]:
class IO_json(object):

    def __init__(self, filepath, filename, filesuffix='json'):
        self.filepath = filepath        # /path/to/file  without the '/' at the end
        self.filename = filename        # FILE_NAME
        self.filesuffix = filesuffix
        # self.file_io = os.path.join(dir_name, '.'.join((base_filename, filename_suffix)))

    def save(self, data):

        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'a', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False))) # In python 3, there is no "unicode" function 
                # f.write(json.dumps(data, ensure_ascii= False)) # create a \" escape char for " in the saved file        

        else:
            # Create new file
            with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'w', encoding='utf-8') as f:
                f.write(unicode(json.dumps(data, ensure_ascii= False)))
                # f.write(json.dumps(data, ensure_ascii= False))    

    def load(self):

        with io.open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), encoding='utf-8') as f:
            return f.read()

In [4]:
class IO_csv(object):

    def __init__(self, filepath, filename, filesuffix='csv'):
        self.filepath = filepath       # /path/to/file  without the '/' at the end
        self.filename = filename       # FILE_NAME
        self.filesuffix = filesuffix
        # self.file_io = os.path.join(dir_name, '.'.join((base_filename, filename_suffix)))

    def save(self, data, NTname, fields):
        # NTname = Name of the NamedTuple
        # fields = header of CSV - list of the fields name
        NTuple = namedtuple(NTname, fields)

        if os.path.isfile('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix)):
            # Append existing file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'ab') as f:
                writer = csv.writer(f)
                # writer.writerow(fields) # fields = header of CSV
                writer.writerows([row for row in map(NTuple._make, data)])
                # list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file

        else:
            # Create new file
            with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix), 'wb') as f:
                writer = csv.writer(f)
                writer.writerow(fields) # fields = header of CSV - list of the fields name
                writer.writerows([row for row in map(NTuple._make, data)])
                #  list comprehension using map on the NamedTuple._make() iterable and the data file to be saved
                # Notice writer.writerows and not writer.writerow (i.e. list of multiple rows sent to csv file
                
                
    def load(self, NTname, fields):
        # NTname = Name of the NamedTuple
        # fields = header of CSV - list of the fields name
        NTuple = namedtuple(NTname, fields)

        with open('{0}/{1}.{2}'.format(self.filepath, self.filename, self.filesuffix),'rU') as f:
            reader = csv.reader(f)
            for row in map(NTuple._make, reader):
                # Using map on the NamedTuple._make() iterable and the reader file to be loaded
                yield row

In [5]:
def parse_date(s):
    
    return time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(s,'%a %b %d %H:%M:%S +0000 %Y'))

def parse_geo(g,index):
    try:
        return str(g["geo"]["coordinates"][index])
    except:
        return ""

In [6]:
def extract_tweet(statuses):
    return [ {'id'         :status['id'], 
              'created_at' :parse_date(status['created_at']), 
              'user_id'    :status['user']['id'],
              'user_name'  :status['user']['name'], 
              'tweet_text' :status['text'].encode('utf-8'), 
              'url'        :url['expanded_url']} 
                               for status in statuses
                                   for url in status['entities']['urls'] ]

def extract_tweet_noURL(statuses):
    return [ {'id'         :status['id'], 
              'created_at' :parse_date(status['created_at']), 
              'user_id'    :status['user']['id'],
              'user_name'  :status['user']['name'], 
              'tweet_text' :status['text'].encode('utf-8') }

                               for status in statuses ]

# from collections import namedtuple

fields01 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text', 'url']
Tweet01 = namedtuple('Tweet01',fields01)

def parse_tweet(data):

    """
    Parse a ``tweet`` from the given response data.
    """

    return Tweet01(
        id=data.get('id', None),
        created_at=data.get('created_at', None),
        user_id=data.get('user_id', None),
        user_name=data.get('user_name', None),
        tweet_text=data.get('tweet_text', None),
        url=data.get('url')

    )

# from collections import namedtuple

fields00 = ['id', 'created_at', 'user_id', 'user_name', 'tweet_text']
Tweet00 = namedtuple('Tweet00',fields00)


def parse_tweet_noURL(data):

    """
    Parse a ``tweet`` from the given response data.
    """

    return Tweet00(
        id=data.get('id', None),
        created_at=data.get('created_at', None),
        user_id=data.get('user_id', None),
        user_name=data.get('user_name', None),
        tweet_text=data.get('tweet_text', None)
    )


In [10]:
## Python Twitter API class and its base methods for authentication, searching, and parsing the results.
class TwitterAPI(object):
    """
    TwitterAPI class allows the Connection to Twitter via OAuth
    once you have registered with Twitter and receive the 
    necessary credentials
    """
    
    # initialize and get the twitter credentials
    def __init__(self):
        consumer_key = config.get('auth','consumer_key') 
        consumer_secret = config.get('auth','consumer_secret') 
        access_token = config.get('auth','access_token') 
        access_secret = config.get('auth','access_secret') 
        
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.access_token = access_token
        self.access_secret = access_secret
        self.retries = 3
        # authenticate credentials with Twitter us Oauth 
        self.auth = twitter.oauth.OAuth(access_token, access_secret, consumer_key, consumer_secret)
        # creates registered Twitter API
        self.api = twitter.Twitter(auth=self.auth)
        
        # logger initialisation

        appName = 'twt150530'

        self.logger = logging.getLogger(appName)

        #self.logger.setLevel(logging.DEBUG)

        # create console handler and set level to debug

        logPath = '/home/felix/data'
        fileName = appName
        fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName))
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fileHandler.setFormatter(formatter)
        self.logger.addHandler(fileHandler) 
        self.logger.setLevel(logging.DEBUG)
        
        # Save to JSON file initialisation
        jsonFpath = '/home/felix/data'
        jsonFname = 'twtr15053001'
        self.jsonSaver = IO_json(jsonFpath, jsonFname)
        
        # Save to MongoDB Intitialisation
        self.mongoSaver = IO_mongo(db='twtr01_db', coll='twtr01_coll')

    
    # search Twitter with query q and max result
    def searchTwitter(self, q, max_res=10, **kwargs):
        search_results = self.api.search.tweets(q=q, count=10, **kwargs)
        statuses = search_results['statuses']
        max_results  = min(1000, max_res)
        
        for _ in range(10):
            try:
                next_results = search_results['search_metadata']['next_results']
                
            except KeyError as e:
                self.logger.error('error in searchTwitter: %s', %(e))
                #self.logger.error('error',(e))
                break
                
            #next_results = urlparse.parse_qs(next_results[1:])
            next_results = urllib.parse.parse_qsl(next_results[1:])
            kwargs = dict(next_results)
            search_results = self.api.search.tweets(**kwargs)
            statuses += search_results['statuses']
            
            if len(statuses) > max_results:
                self.logger.info('info in searchTwitter - got %i tweets - max: %i' %(len(statuses), max_results))
                break
                
        return statuses
    
    def saveTweets(self, statuses):
        # Saving to JSON File
        self.jsonSaver.save(statuses)
        
        # Saving to MongoDB
        for s in statuses:
            self.mongoSaver.save(s)
                
    # parse tweets as it is collected to extract id, creation date, user id, tweet text
    def parseTweets(self, statuses):
        return [(status['id'],
                 status['created_at'],
                 status['user']['id'],
                 status['user']['name'],
                 status['text'], 
                 url['expanded_url'])
                    for status in statuses
                        for url in status['entities']['urls']
        ]
    
    def getTweets(self, q,  max_res=10):
    """
    Make a Twitter API call whilst managing rate limit and errors.
    """
    def handleError(e, wait_period=2, sleep_when_rate_limited=True):
        if wait_period > 3600: # Seconds
            self.logger.error('Too many retries in getTweets: %s', %(e))
            raise e
        if e.e.code == 401:
            self.logger.error('error 401 * Not Authorised * in getTweets: %s', %(e))
            return None
        elif e.e.code == 404:
            self.logger.error('error 404 * Not Found * in getTweets: %s', %(e))
            return None
        elif e.e.code == 429: 
            self.logger.error('error 429 * API Rate Limit Exceeded * in getTweets: %s', %(e))
            if sleep_when_rate_limited:
                self.logger.error('error 429 * Retrying in 15 minutes * in getTweets: %s', %(e))
                sys.stderr.flush()
                time.sleep(60*15 + 5)
                self.logger.info('error 429 * Retrying now * in getTweets: %s', %(e))
                return 2
            else:
                raise e # Caller must handle the rate limiting issue

        elif e.e.code in (500, 502, 503, 504):
            self.logger.info('Encountered %i Error. Retrying in %i seconds' % (e.e.code, wait_period))
            time.sleep(wait_period)
            wait_period *= 1.5
            return wait_period

        else:
            self.logger.error('Exit - aborting - %s', %(e))
            raise e

    while True:
        try:
            self.searchTwitter( q, max_res=10)

        except twitter.api.TwitterHTTPError as e:
            error_count = 0 
            wait_period = handleError(e, wait_period)
            if wait_period is None:
                return


SyntaxError: invalid syntax (442638050.py, line 64)

In [6]:
t= TwitterAPI()

NameError: name 'TwitterAPI' is not defined

In [18]:
q="ChampionsLeague"
tsearch = t.searchTwitter(q)

In [19]:
len(tsearch)

20

In [20]:
pp(tsearch[1])

{'contributors': None,
 'coordinates': None,
 'created_at': 'Fri Sep 23 12:10:22 +0000 2022',
 'entities': {'hashtags': [{'indices': [41, 45], 'text': 'UCL'}],
              'media': [{'display_url': 'pic.twitter.com/uNp5oWizZA',
                         'expanded_url': 'https://twitter.com/ChampionsLeague/status/1573273723569938433/photo/1',
                         'id': 1573261092658286596,
                         'id_str': '1573261092658286596',
                         'indices': [46, 69],
                         'media_url': 'http://pbs.twimg.com/media/FdVYoNbXEAQaNlN.jpg',
                         'media_url_https': 'https://pbs.twimg.com/media/FdVYoNbXEAQaNlN.jpg',
                         'sizes': {'large': {'h': 2048,
                                             'resize': 'fit',
                                             'w': 1365},
                                   'medium': {'h': 1200,
                                              'resize': 'fit',
                     

In [21]:
tparsed = t.parseTweets(tsearch)
pp(tparsed)

[(1573283606738378754,
  'Fri Sep 23 12:10:16 +0000 2022',
  1441738380,
  'Marek',
  'https://t.co/5y75N0qGlu https://t.co/hPHj221Cqf',
  'https://twitter.com/ChampionsLeague/status/1573281273602195457'),
 (1573283574807433218,
  'Fri Sep 23 12:10:09 +0000 2022',
  1498163673067659268,
  'lama',
  'Any other answer is wrong!🌟🐐 https://t.co/9h0QxImGaP '
  'https://t.co/eJ18tmqST2',
  'https://twitter.com/ChampionsLeague/status/1573243524123746304')]
