Get the collected tweets from CSVs and analyze them

In [None]:
import sys
import os
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import AxesGrid
from sklearn import decomposition
import statsmodels.regression.linear_model as lm
from sklearn.ensemble import RandomForestRegressor 
from typing import List, Tuple, Dict

import requests
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
from PIL import Image

import itertools
import base64

MYDIR = "./"

COUNTIES = ["Alameda", "San Francisco", "San Mateo", "Santa Clara", "Santa Cruz"]

BIG_TOPICS = {"race": ["black","blm","racism", "race", "racial"],
              "wildfires": ["fires", "wildfires", "wild fires", "air quality", "evacuation"],
              "politics": ["trump", "biden", "elections", "democrat", "liberal", "republican", "conservative"],
              "covid": ["covid", "coronavirus", "corona", "pandemic", "virus", "CDC", "WHO", "fauci"]
             }

In [None]:
class TweetAnalyzer:
    
    def __init__(self, 
                 counties: List[str],
                 big_topics: Dict[str, object],
                 state_id: str = "CA"
                ):
        
        """
        Get the files for analysis
        """
        self.mMyDir = MYDIR
        
        allFiles = os.listdir(self.mMyDir)
        self.mMyFiles = [ff for ff in allFiles if 'twdf' in ff and '.csv' in ff]
        self.mMyFiles

        self.mTopics = []

        """
        Set the counties and topics of interest for search
        """
        if counties is None:
            self.mCounties = None
        else:
            self.mCounties = counties.copy()
        self.mStateID = state_id
        
        if big_topics is None:
            self.mBigTopics = None
        else:
            self.mBigTopics = big_topics.copy()
            for kk in self.mBigTopics.keys():
                self.mTopics.extend(self.mBigTopics[kk])

        """
        Identify the cities in each county
        """
        self.mCityCountyDF = pd.read_csv(self.mMyDir + \
                            "../data/simplemaps_uscities_basicv1.7/uscities.csv")

        if self.mCounties is None:
            self.mCiCounties = None
            self.mCiStates = None
            self.mCities = None
            self.mStateName = None
            self.mCountyTopics = None
            
        else:
            city_county = self.getCitiesForCounties()

           

            self.mCiCounties = list(city_county.sort_values("city_county")["city_county"].unique())
            self.mCiStates = list(city_county.sort_values("city_state")["city_state"].unique())
            self.mCities = list(city_county.sort_values("city")["city"].unique())

            self.mCountyTopics = []
            #         self.mCountyTopics.extend(self.mCiCounties.copy())
            self.mCountyTopics.extend(self.mCities.copy())
            self.mCountyTopics.extend(self.mTopics.copy())

            for cc in self.mCities:
                for tt in self.mTopics:
                    ct = cc + ", " + tt
                    self.mCountyTopics.append(ct)
        
        """
        Get the self.mAPI information
        """
        api_key, api_secret_key, bearer_token = \
            self.get_api_keys_and_bearer_token()

        self.mAPI: Dict[str, str] = {"key": api_key,
                                     "secret_key": api_secret_key,
                                     "bearer_token": bearer_token
                                    }
        
        """
        set the self.mSearchParams
        """
        search_url, search_headers = \
            self.get_search_url_search_headers(api_key, 
                                               api_secret_key)
            
        self.mSearchParams = {"url": search_url,
                              "headers": search_headers
                             }
        
        self.mMeltwaterQuery = \
            self.formulateQuery4meltwater(self.mStateName)
        
        
    """
    Generate the search query for Meltwater's UI
    """
    def formulateQuery4meltwater(self, state):
        
        if self.mCounties is None:
            return None
        
        counties_cities = self.mCounties.copy()
        counties_cities.extend(self.mCities)
        counties_cities
        
        retQueryString = '("' + state + \
            '") AND (("' +\
            '") OR ("'.join(counties_cities) + '"))'
        
        return retQueryString
        
    """
    Using the simplemaps us cities data, get the cities in each 
    county in self.mCounties within self.mStateID
    """
    def getCitiesForCounties(self, verbose: bool = False) -> pd.DataFrame:
        
        cities_counties = self.mCityCountyDF.copy()
        if verbose:
            print(cities_counties.head())
        whatIneed = cities_counties[["city", 
                                     "county_name", 
                                     "state_id", 
                                     "state_name"]].copy()
        
        mystate = whatIneed.loc[whatIneed.state_id == self.mStateID].copy()
        myCounties = \
            mystate.loc[mystate.county_name.isin(self.mCounties)].copy()
            
        myCounties["city_county"] = myCounties["city"] + ", " + myCounties["county_name"]
        myCounties["city_state"] = myCounties["city"] + ", " + myCounties["state_name"]
        
        return myCounties.reset_index(drop=True)
        
    
    def get_tweets(self, api=None, screen_name=None):
        timeline = api.GetUserTimeline(screen_name=screen_name, count=200)
        earliest_tweet = min(timeline, key=lambda x: x.id).id
        print("getting tweets before:", earliest_tweet)

        while True:
            tweets = api.GetUserTimeline(
                screen_name=screen_name, max_id=earliest_tweet, count=200
            )
            new_earliest = min(tweets, key=lambda x: x.id).id

            if not tweets or new_earliest == earliest_tweet:
                break
            else:
                earliest_tweet = new_earliest
                print("getting tweets before:", earliest_tweet)
                timeline += tweets

        return timeline

    def get_api_keys_and_bearer_token(self, verbose: bool = False) -> List[str]:

        df = pd.read_csv("./chemodan123KKs.csv")

        if verbose:
            print(df)

        keyDF = df.loc[df.name == 'api_key'].copy()
        api_key = list(keyDF["value"])[0]

        sKeyDF = df.loc[df.name == 'api_secret_key'].copy()
        api_secret_key = list(sKeyDF["value"])[0]

        bTokDF = df.loc[df.name == 'bearer_token'].copy()
        bearer_token = list(bTokDF["value"])[0]

        return api_key, api_secret_key, bearer_token

    def get_search_url_search_headers(self, api_key: str, 
                                      api_secret_key: str) -> List[str]:

        key_secret = '{}:{}'.format(api_key, api_secret_key).encode('ascii')
        b64_encoded_key = base64.b64encode(key_secret)
        b64_encoded_key = b64_encoded_key.decode('ascii')


        base_url = 'https://api.twitter.com/'
        auth_url = '{}oauth2/token'.format(base_url)

        auth_headers = {
            'Authorization': 'Basic {}'.format(b64_encoded_key),
            'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'
        }

        auth_data = {
            'grant_type': 'client_credentials'
        }

        auth_resp = requests.post(auth_url, headers=auth_headers, data=auth_data)

        # Check status code okay
        if auth_resp.status_code == 200:
            access_token = auth_resp.json()['access_token']

        search_headers = {
            'Authorization': 'Bearer {}'.format(access_token)    
        }

#         'https://api.twitter.com/1.1/tweets/search/fullarchive'
        search_url = '{}1.1/search/tweets.json'.format(base_url)

        return search_url, search_headers


    def searchTwitter(self, search_url: str, 
                      search_headers: str, 
                      search_params: Dict[str, object]) -> object:

        search_resp = requests.get(search_url, 
                                   headers=search_headers, 
                                   params=search_params)
        tweet_data = search_resp.json()

        return tweet_data

    def convertTweets2DF (self, tweet_data, 
                          verbose: bool = False) -> pd.DataFrame:
        if "statuses" in tweet_data.keys():
            
            statusesList = tweet_data["statuses"]
            statusesList

            tweetsList = [] # List[Dict[str, str]]
            for ii in range(len(statusesList)):
                if verbose:
                    print(ii)
                status = statusesList[ii]
                statusesHT = {} # Dict[str, str]
                for kk in status.keys():
                    if verbose:
                        print(f"""type(status[{kk}]) = {type(status[kk])}""")
                        print(f"""status[{kk}] = {status[kk]}""")
                    if type(status[kk]) is str or type(status[kk]) is int or type(status[kk]) is bool: 
                        statusesHT[kk] = status[kk]

                tweetDF = pd.DataFrame(statusesHT, index=[0])
                tweetsList.append(tweetDF)  

            if len(tweetsList) > 0:
                tweetsDF = pd.concat(tweetsList, 
                                     sort=False, 
                                     ignore_index=True).reset_index(drop=True)
            else:
                return None
        else:
            print("no 'statuses' found in tweet_data")
            return None
        
        return tweetsDF

    def getTweetsIntoDF(self,
                        query: str, 
                        cnt: int
                       ) -> pd.DataFrame:

        search_params = {
            'q': query,
            'result_type': 'recent',
            'count': cnt,
            'lang': 'en'
        }

        tweet_data = self.searchTwitter(self.mSearchParams["url"],
                                        self.mSearchParams["headers"],
                                        search_params)

        twDF = self.convertTweets2DF(tweet_data)
        if twDF is not None:
            twDF["topic"] = query

        return twDF

    def plotHistogramForRetweetsByTopic(self, twDF: pd.DataFrame):
        twDFtweets = pd.DataFrame(twDF.groupby('topic').count()['id']).reset_index()
        print(twDFtweets)
        twDFretweets = pd.DataFrame(twDF.groupby('topic').mean()['retweet_count']).reset_index()
        print(twDFretweets)

        import plotly.express as px
        tDF = twDF.loc[twDF["retweet_count"] > 0].copy()
        tDF = twDF.copy()
        tDF["retweet_count"] = tDF["retweet_count"].astype("int64")
        fig = px.histogram(tDF, 
                           x="retweet_count", 
                           color=tDF["topic"], 
                           barmode="group",
                           title="Retweet Counts by topic",
                           nbins=20               
                          )
        fig.show()
        
    def getTweetDataFromCSV(self) -> pd.DataFrame:
        
        dataHT: Dict[str, pd.DataFrame] = {}
        for ff in self.mMyFiles:
            dataHT[ff] = pd.read_csv(self.mMyDir + ff)

        if len(dataHT.keys()) > 0:
            allData = pd.concat(dataHT).reset_index(drop=True)
        else:
            return None
        
        return allData
    
    def getBigTopic(self, topic) -> str:
        for kk in self.mBigTopics.keys():
            if topic in self.mBigTopics[kk]:
                return kk
        return None
    
    def splitTopicsIDbigTopics(self, allData: pd.DataFrame) -> pd.DataFrame:
        """
        Split the topic into location and topic
        """
        subtopics = allData.topic.str.split(',', expand=True)
        subtopics.columns = ['location', 'the_topic']
        subtopics.the_topic = subtopics.the_topic.str.strip()
        subtopics.the_topic.unique()

        allDataExt = pd.concat([allData, subtopics], axis=1)
        allDataExt.head(1)

        single_topic = allDataExt.loc[allDataExt.the_topic.isnull()].copy()
        single_topic.the_topic = single_topic.location
        single_topic.location = None
        single_topic.the_topic.unique()

        """
        Get the big topic for "the_topic" based on self.mBigTopics
        """
        single_topic["big_topic"] = single_topic.the_topic.apply(lambda x: self.getBigTopic(x))
        single_topic.big_topic.unique()
        
        double_topic = allDataExt.loc[~allDataExt.the_topic.isnull()].copy()
        double_topic["big_topic"] = double_topic.the_topic.apply(lambda x: self.getBigTopic(x))
        double_topic.big_topic.unique()
        
        """
        Concatenate single-topic and double-topic entries and return the resulting DF
        """
        allDataFinal = pd.concat([single_topic, double_topic]).reset_index(drop=True)
        
        allDataFinal.loc[(allDataFinal.location.isnull()), 'big_topic'] = \
            allDataFinal.loc[(allDataFinal.location.isnull())].the_topic

        allDataFinal.loc[(allDataFinal.location.isnull()), 'location'] = \
            allDataFinal.loc[(allDataFinal.location.isnull())].the_topic
        
        return allDataFinal
      
    def getTweets(self, 
                  myFiles: List[str], 
                  myDir: str, 
                  verbose: bool = True) -> pd.DataFrame:
        
        tweetsDFht: Dict[str, pd.DataFrame] = {}
        for ff in myFiles:
            if 'crdownload' in ff or 'zip' in ff:
                continue
            print(ff)
            df = pd.read_csv(myDir + ff, encoding='utf-16', sep='\t')
            tweetsDFht[ff] = df.copy()

        if verbose:
            print(f"""Got the tweets from {len(myFiles)} files into a hashtable of dataframes""")
            
        allTweetsDF = pd.concat(tweetsDFht, ignore_index=True).drop_duplicates()
        cols = [cc.replace(' ', '_').lower() for cc in list(allTweetsDF.columns)]
        allTweetsDF.columns = cols

        allTweetsDF["date"] = pd.to_datetime(allTweetsDF.alternate_date_format, format='%b %d, %Y')
        del allTweetsDF['alternate_date_format']
        allTweetsDF.sort_values(['date', 'time'], inplace=True)
        allTweetsDF.reset_index(drop=True, inplace=True)

        allTweetsDF['wk'] = allTweetsDF['date'].dt.isocalendar().week
        if verbose:
            print(len(allTweetsDF))
            print(allTweetsDF['wk'].isna().sum())
        allTweetsDF['week'] = [str(ww) if ww >= 10 else '0' + str(ww) for ww in allTweetsDF['wk']]
        allTweetsDF['year'] = allTweetsDF['date'].dt.isocalendar().year 
        allTweetsDF['year_week'] = 'w' + allTweetsDF.week.astype(str) + '_' + allTweetsDF.year.astype(str)
        del allTweetsDF['wk']

        print(f"""Got all ({len(allTweetsDF)}) tweets from {myDir}""")

        if verbose:
            display.display(allTweetsDF.head())

        return allTweetsDF
    
    
    def drawWordCloud(self, 
                      region: str,
                      myTweetsDF: pd.DataFrame,
                      column: str = 'hit_sentence',
                      maxWords: int = 200,
                      color: str = 'black') -> WordCloud:
        
        stopwords = list(STOPWORDS).copy()
        stopwords.extend(['qt', 'rt', '@', 'df', 'http', 'https', 
                          'say', 'include', 'google', 'graphistry',
                          'co', "s'", "d'", "'", 'retweet', 'etc'
                         ])

        dataset = list(myTweetsDF[column].astype(str))
        cleaned_word = ' '.join([str(word).replace('#', '') for word in dataset
                                if 'http' not in word
                                    and not word.startswith('@')
                                    and word != 'rt'
                                ]).lower()
        wordcloud = WordCloud(stopwords=stopwords,
                          background_color=color,
                          width=800,
                          height=500
                         ).generate(cleaned_word)
        plt.figure(1)
        plt.imshow(wordcloud)
        plt.axis('off')
        plt.title(f"""{region} word cloud (top {maxWords}) from {myTweetsDF.date.min()} to {myTweetsDF.date.max()}""")
        #         plt.savefig('(key).png')
        #         plt.close()
        plt.show()       
        return wordcloud
    
    # TODO: remove location words used in the meltwater query
    def drawWordCloudByWeek(self, 
                            region: str,
                            tweetsDF: pd.DataFrame,
                            column: str = 'hit_sentence',
                            maxWords: int = 200,
                            color: str = 'white') -> WordCloud:
        
        if 'week' not in tweetsDF.columns and 'year' not in tweetsDF.columns:
            tweetsDF['wk'] = tweetsDF['date'].dt.isocalendar().week 
            tweetsDF['week'] = [str(ww) if ww >= 10 else '0' + \
                                str(ww) for ww in allTweetsDF['wk']]
            tweetsDF['year'] = tweetsDF['date'].dt.isocalendar().year 
            tweetsDF['year_week'] = 'w' + tweetsDF.week.astype(str) + '_' \
                                    + tweetsDF.year.astype(str)
            del tweetsDF['wk']

        tweetsDF['year_week'] = tweetsDF.year.astype(str) + '_' + tweetsDF.week.astype(str)
        
        year_weeks = sorted(list(tweetsDF.year_week.unique()))

        
        separator = "============================================================="
        for yw in year_weeks:
            print(separator + separator)
            mmTweetsDF = tweetsDF.loc[tweetsDF.year_week == yw]
            self.drawWordCloud(region, mmTweetsDF, column, maxWords, color)