In [1]:
# Libraries
import re
import os
import nltk
import requests
import unidecode
import unicodedata
import contractions
import pandas as pd
import datetime as dt
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from collections import Counter
from nltk.corpus import stopwords
from googletrans import Translator
from geopy.geocoders import Nominatim

import warnings
warnings.filterwarnings('ignore')

In [2]:
# loadinng credenttials as environmen variables
load_dotenv('data/twitter_kafka_credentials.env', override = True)

True

In [3]:
# to get city coordinates
geolocator = Nominatim(user_agent = 'bmartin')

# Get current date
today = dt.date.today()
today = today.strftime("%Y-%m-%d")

In [4]:
# getting twitter credentials
twitter_key = os.environ.get('api_key')
twitter_secret_key = os.environ.get('secret_key')
bearer_token = os.environ.get('bearer_token')

In [5]:
# getting twitter credentials
twitter_key1 = os.environ.get('api_key1')
twitter_secret_key1 = os.environ.get('secret_key1')
bearer_token1 = os.environ.get('bearer_token1')

In [8]:
def search_tweets(query, bearer_token = bearer_token):    
    
    headers = {"Authorization": "Bearer {}".format(bearer_token)}

    url = f"https://api.twitter.com/2/tweets/search/recent?query={query}&"

    params = {
        # select specific Tweet fields from each returned Tweet object
        'tweet.fields': 'text,created_at,lang,possibly_sensitive', # public_metrics
        
        # maximum number of search results to be returned (10 - 100)
        'max_results': 100,
        
        'expansions': 'author_id,referenced_tweets.id,geo.place_id',
        
        "place.fields": 'country,full_name,name',
        
        "user.fields": 'location'
        
        #"next_token": "b26v89c19zqg8o3fpdy6tr1fnbfjofv5hd12kcjsua4xp"
    }
    
    # request
    response = requests.get(url = url, params = params, headers = headers)

    # verify successfull request
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
        
    else:
        return response.json()

In [9]:
# search term
search_tweet = search_tweets(query = "Black Widow")

# 4 main keys
search_tweet.keys()

dict_keys(['data', 'includes', 'meta'])

In [10]:
search_tweet['meta']

{'newest_id': '1464772543886278657',
 'oldest_id': '1464760672412221449',
 'result_count': 100,
 'next_token': 'b26v89c19zqg8o3fpdy797c74n18b39pyget7rzrzz33x'}

In [11]:
search_tweet['includes'].keys()

dict_keys(['users', 'tweets'])

In [12]:
def create_dataframes(json_tweets, today):

    if "places" in search_tweet['includes'].keys():
        
        # If the field exists, create a dataframe with the corresponding data
        places = pd.json_normalize(search_tweet['includes']['places']).rename(columns = {"id":"geo.place_id"})
        
        # Create users dataframe
        users = pd.json_normalize(search_tweet['includes']['users']).rename(columns = {"id":"user_id"})
    
        # Create df with tweet's data
        tweets = pd.json_normalize(search_tweet['data']).rename(columns = {"id":"tweet_id"})
        
        # Get tweet's type
        tweets['type'] = tweets.referenced_tweets.apply(lambda x: x[0]["type"] if type(x) == list else None)
        
        # Drop retweeted tweets
        tweets = tweets[tweets["type"] != "retweeted"].reset_index(drop = True)
        
        # List of users in tweets dataframe
        user_list = tweets.author_id.unique()

        # Only keep users from tweets dataframe
        users = users.loc[users.user_id.isin(user_list)].reset_index(drop = True)
        
        # Drop cols
        tweets = tweets.drop(['referenced_tweets','author_id','geo.place_id'], axis = 1)
        
        # Save data
        #tweets.to_csv(f"data/tweets_{today}.csv", index = False)
        #users.to_csv(f"users/users_{today}.csv", index = False)
        #places.to_csv(f"places/places_{today}.csv", index = False)
        
        return tweets, users, places
        
    else: 
        # Create users dataframe
        users = pd.json_normalize(search_tweet['includes']['users']).rename(columns = {"id":"user_id"})
    
        # Create df with tweet's data
        tweets = pd.json_normalize(search_tweet['data']).rename(columns = {"id":"tweet_id"})
        
        # Get tweet's type
        tweets['type'] = tweets.referenced_tweets.apply(lambda x: x[0]["type"] if type(x) == list else None)
        
        # Drop retweeted tweets
        tweets = tweets[tweets["type"] != "retweeted"].reset_index(drop = True)
        
        # List of users in tweets dataframe
        user_list = tweets.author_id.unique()

        # Only keep users from tweets dataframe
        users = users.loc[users.user_id.isin(user_list)].reset_index(drop = True)
        
        # Drop cols
        tweets = tweets.drop(['referenced_tweets','author_id'], axis = 1)
        
        # Save data
        #tweets.to_csv(f"data/tweets_{today}.csv", index = False)
        #users.to_csv(f"users/users_{today}.csv", index = False)
        
        return tweets, users

In [13]:
if "places" in search_tweet['includes'].keys():
    tweets, users, places = create_dataframes(search_tweet, today)
    
else:
    tweets, users = create_dataframes(search_tweet, today)

In [14]:
class PreProcessor:
    
    def __init__(self, regex_dict = None):
        
        # creating classes
        # stem
        self.sb = nltk.stem.SnowballStemmer('english')
        
        # lemmatize
        self.lemmatizer = nltk.stem.wordnet.WordNetLemmatizer()
        
        # translate
        self.translator = Translator()
        
        # declare a default regex dict
        self.default_regex_dict = {'goo[o]*d':'good', '2morrow':'tomorrow', 'b4':'before', 'otw':'on the way',
                                   'idk':"i don't know", ':)':'smile', 'bc':'because', '2nite':'tonight',
                                   'yeah':'yes', 'yeshhhhhhhh':'yes', ' yeeeee':'yes', 'btw':'by the way', 
                                   'fyi':'for your information', 'gr8':'great', 'asap':'as soon as possible', 
                                   'yummmmmy':'yummy', 'gf':'girlfriend', 'thx':'thanks','nowwwwwww':'now', 
                                   ' ppl ':' people ', 'yeiii':'yes'}
        
        # if no regex_dict defined by user, then use 
        # one by default. Else, concat two regex dicts
        if regex_dict:            
            self.regex_dict = {**regex_dict, **default_regex_dict}
            
        else:
            self.regex_dict = self.default_regex_dict
    
    def translate_twt(self, pdf):
    
        """
        This function helps to translate a tweet from any 
        language to English.

        Inputs:
            - pdf: Pandas dataframe. This dataframe must have
               the following columns:
                - lang: Tweet's language.
                - clean_tweet: Partially pre-processed tweet.

        Outputs: Translated tweet from any language available 
                 in googletrans api to English.
        """

        # Check if the language of the tweet is either undefined or English
        # to avoid translation.
        if pdf["lang"] == "und" or pdf["lang"] == "en":
            pdf["translated_tweet"] = pdf["clean_tweet"]

        # Check if tweet is in Hindi. The code of Hindi language is "hi", but 
        # Twitter has defined the code as "in".
        elif pdf["lang"] == "in":
            pdf["translated_tweet"] = self.translator.translate(pdf["clean_tweet"], src = "hi", dest = "en").text

        # For any other language the translator should work just fine, so the
        # api should work with the language detected by Twitter.
        else:
            pdf["translated_tweet"] = self.translator.translate(pdf["clean_tweet"], src = pdf["lang"], dest = "en").text

        return pdf["translated_tweet"]

    
    def removeNoise(self, pdf):
        
        """
        Function to remove noise from strings. 
        
        Inputs: A pandas dataframe with raw strings of length n.
        
        Output: A clean string where elements such as accented 
        words, html tags, punctuation marks, and extra white 
        spaces will be removed (or transform) if it's the case.
        """
        
        # to lower case
        pdf["clean_tweet"] = pdf.text.apply(lambda x: x.lower())
        
        # remove accented characters from string
        # e.g. canción --> cancion
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: unidecode.unidecode(x))
        
        # remove html tags 
        pdf["clean_tweet"] = pdf.clean_tweet.str.replace(r'<[^<>]*>', '', regex=True)
        
        # remove (match with) usernames | hashtags | punct marks | links
        # punct marks = ",.':!?;
        # do not remove: ' 
        # but remove: "
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x:' '.join(re.sub("(@[A-Za-z0-9]+)|(#[A-Za-z0-9]+)|([-.,:_;])|(https?:\/\/.*[\r\n]*)",
                                                                            " ", x).split()).replace('"',''))
                
        # remove white spaces at the begining and at 
        # the end of a string
        pdf['clean_tweet'] = pdf.clean_tweet.apply(lambda x: x.lstrip(' '))
        pdf['clean_tweet'] = pdf.clean_tweet.apply(lambda x: x.rstrip(' '))
        
        # Translate tweet
        pdf["clean_tweet"] = pdf.apply(lambda x: self.translate_twt(x), axis = 1)
        
        # normalize string
        # normalize accented charcaters and other strange characters
        # NFKD if there are accented characters (????
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: unicodedata.normalize('NFKC', x).encode('ASCII', 'ignore').decode("utf-8"))
        
        return pdf
    
    def apply_regex(self, pdf):
    
        # Loop over dataframe index (records)
        for idx in pdf.index:
            
            # Loop over regex_dict (keys and values)
            for k,v in zip(self.regex_dict.keys(), self.regex_dict.values()):
                
                # Replace string if needed
                pdf["clean_tweet"].iloc[idx] = pdf["clean_tweet"].iloc[idx].replace(k, v)
          
        # Return col
        return pdf["clean_tweet"]
    
    
    def textNormalization(self, pdf):
        
        """
        Function to normalize a string. 
        
        Inputs: A pandas dataframe with strings (of length n) that 
        will be normalized. 
        
        Outputs: A normalized string whitout noise, words in their
        (expected) correct form and with no stopwords.
        """
        
        # remove noise first
        pdf = self.removeNoise(pdf)

        # expand contractions
        # e.g. don't --> do not
        pdf['clean_tweet'] = pdf.clean_tweet.apply(lambda x: contractions.fix(x))
 
        # Normalize words
        #pdf['clean_tweet'] = self.apply_regex(pdf)
        pdf['clean_tweet'] = pdf.clean_tweet.replace(self.regex_dict)
                
        # get English stopwords    
        stop_words = stopwords.words('english')
        stopwords_dict = Counter(stop_words)
        
        # remove stopwords from string
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: ' '.join([word for word in x.split()
                                                                       if word not in stopwords_dict]))
            
        return pdf
    
    
    def wordTokenize(self, pdf):
        """
        Function to tokenize a string into words. Tokenization is a way 
        of separating a piece of text into smaller units called tokens.
        In this case tokens are words (but can also be characters or 
        subwords).
        
        Inputs: A pandas dataframe with strings (of length n) that will be tokenized. 
        
        Outputs: A list of tokenized words.
        """
        # string normalized
        #normalized = self.textNormalization(string)
        pdf = self.textNormalization(pdf)
        
        # Use word_tokenize method to split the string
        # into individual words. By default it returns
        # a list.
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: nltk.word_tokenize(x))        
        
        # Using isalpha() will help us to only keep
        # items from the alphabet (no punctuation
        # marks). 
        #pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: [word for word in x if word.isalpha()])
        
        # Keep only unique elements
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: list(set(x)))

        # return list of tokenized words by row
        return pdf
    
    
    def phraseTokenize(self, pdf):
        
        """
        Function to tokenize a string into sentences. Tokenization is
        a way of separating a piece of text into smaller units called
        tokens. In this case tokens are phrases (but can also be words,
        characters or subwords).
        
        Inputs: A string (of length n) that will be tokenized. 
        
        Outputs: A list of tokenized sentences.
        """
        
        # pandas dataframe with strings normalized
        pdf = self.textNormalization(pdf)
        
        # Use sent_tokenize method to split the string
        # into sentences. By default it returns a list.
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: nltk.sent_tokenize(x))   
        
        return pdf 
    
    
    def stemWords(self, pdf):
        
        """
        Function to stem strings. Stemming is the process of reducing
        a word to its word stem that affixes to suffixes and prefixes 
        or to the roots of words (known as a lemma).
        
        Inputs: A raw string of length n.
        
        Output: Roots of each word of a given string.
        """
        
        # pandas dataframe with strings normalized
        pdf = self.textNormalization(pdf)
        
        # tokenized string (into words)
        pdf = self.wordTokenize(data)
            
        # reduct words to its root    
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: [self.sb.stem(word) for word in x])
        
        return pdf
    
    
    def lemmatizeWords(self, pdf):
        
        """
        Function to lemmatize strings. Lemmatization is a method 
        responsible for grouping different inflected forms of 
        words into the root form, having the same meaning. It is 
        similar to stemming.
        
        Inputs: A raw string of length n.
        
        Output: Roots of each word of a given string (with better
        performance than in stemming).
        """
        
        # pandas dataframe with strings normalized
        pdf = self.textNormalization(pdf)
        
        # list of tokenized words (from string)
        # Here it was decided to tokenize by words
        # rather than by sentences due to we thought
        # it would be easier to find the correct roots
        # of each word.
        pdf = self.wordTokenize(pdf)
        
        # lematize word from list of tokenized words
        #lematized = [self.lemmatizer.lemmatize(word) for word in tokenized]
        pdf["clean_tweet"] = pdf.clean_tweet.apply(lambda x: [self.lemmatizer.lemmatize(word) for word in x])
        
        return pdf

In [None]:
# Create class object
pre_processor = PreProcessor()

# Clean data and only keep 
# the roots of each word.
#tweets['clean_tweet'] = tweets.text.apply(pre_processor.removeNoise)
tweets = pre_processor.lemmatizeWords(tweets)
tweets.head(2)