#### Connect to Spark Session and setup SQL context

In [1]:
#Import Spark Session (SparkContext and SparkSQL)
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ConvertTweets").enableHiveSupport().getOrCreate()
sqlContext = SparkSession(sc)

#### Import the required libraries

In [2]:
import os
import glob
import pandas as pd
import json
from datetime import datetime
import us
import re
import geopy
from geopy.geocoders import Nominatim
from unidecode import unidecode
from textblob import TextBlob
from afinn import Afinn

#### Find the corresponding state based on the state code/ partial state name passed

In [3]:
def fncFindStateByName(location):
    state = ""
    if location:
        try:
            if (location.lower().find("new ") > -1):
                location = location.lower().replace("new ", "new_")
        except ValueError:
            pass
        matchFound = False

        for eachWord in location.strip().split():
            state = us.states.lookup(eachWord.replace("_"," "))
            if state:
                return state
            for eachWord in location.strip().split(","):
                state = us.states.lookup(eachWord)
                if state:
                    return state
        state = ""
        try:
            if (location.lower().index('united states')>=0):
                state = "United States of America"
        except ValueError:
            pass
        try:
            if (location.lower().index('america')>=0):
                state = "United States of America"
        except ValueError:
            pass
        try:
            if (location.lower().index(' usa')>=0):
                state = "United States of America"
        except ValueError:
            pass
        try:
            if (location.lower().index('usa ')>=0):
                state = "United States of America"
        except ValueError:
            pass
        try:
            if (location.lower().index(',usa')>=0):
                state = "United States of America"
        except ValueError:
            pass
        try:
            if (location.lower()=='usa'):
                state = "United States of America"
        except ValueError:
            pass    

    return state

#### Find the US state on the basis of the coordinates

In [4]:
def fncFindStateByCoordinates(geo):
    geolocator = Nominatim(user_agent ="FP1_Convert_Tweets")
    state = ""
    if geo:
        try:
            coordinates = str(geo).split("[")[1].split("]")[0]
            location = geolocator.reverse(coordinates)
            if str(location).split(',')[-1] ==" United States":
                state = str(location).split(',')[-3].strip()
        except Exception as e:
            print ("Exception: fncFindStateByCoordinates: Processing " + str(geo) + ". \n\tGot exception: " + str(e))
    return state

#### Find the US state on the basis of the place provided in the 'place' attribute of tweet 

In [5]:
def fncfindStateCode(eachPlace):
    if eachPlace:
        try:
            stateCd = eachPlace.asDict().get('full_name').split(",")[1]
            if stateCd == 'USA':
                stateCd = eachPlace.asDict().get('full_name').split(",")[0]
            state = fncFindStateByName(stateCd)
            return state
        except IndexError:
            pass
        except Exception as e:
            print ("Exception: fncfindStateCode: Processing " + str(eachPlace) + ". \n\tGot exception: " + str(e))
    return ""

#### Get the list of files

In [6]:
def fncProcessFolder(eachFolder):
    tweets = None
    tweetsF = None
    print ("\n\nINFO!!! Processing " + eachFolder)
    fullpath = os.path.join(eachFolder,"FP1_tweets*.txt")
#   fullpath = os.path.join(eachFolder,"2020*.jsonl")
    
    # Read all the files and load to a temporary table - "tweets"
    df = sqlContext.read.format("json").options(inferschema = True).load("file://" + fullpath)
    # Select only the required fields from the JSON

    #    Only for jsonl files
    #tweets = df.select("id" ,"created_at","text","truncated", "user.location","geo", "place", "entities.urls.url")
    #   for files extracted in folders equal or older than 20201123
    #tweets = df.select("id" ,"created_at","text","truncated", "_json.user.location","_json.geo", "_json.place", "_json.entities.urls.url")
    #   for files extracted in folders after 20201123
    tweets = df.select("id" ,"created_at","full_text","truncated", "_json.user.location","_json.geo", "_json.place", "_json.entities.urls.url")
    tweets.registerTempTable("tweets")
    tweets.cache()
    print ("\n\nINFO!!! Number of tweets read - " + str(tweets.count()))
    
    # Clean up the tweet and select only the required columns
#    tweets2 = sqlContext.sql("select id, created_at, geo, location, place, text,truncated,url from tweets")
    tweets2 = sqlContext.sql("select id, created_at, geo, location, place, full_text as text,truncated,url from tweets")
    tweets2.cache()
    
    # Convert the tweets to Pandas dataframe and find the corresponding State for the location/ geo/ place 
    # attributes in the tweet
    tweetsDF = tweets2.toPandas()

    af = Afinn()
    # All Lambda functions    
    fncRemoveUnicode = lambda x: unidecode(x) if x else ""
    fncRemoveJunk = lambda x: re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)|(\n)", " ",str(x)) if x else ""
    fncCleanTxt = lambda x: re.sub("\s\s+"," ",re.sub("(^RT)","",str(x))) if x else ""
    fncFindPolarity = lambda x: TextBlob(x).sentiment.polarity
    fncFindSubjectivity = lambda x: TextBlob(x).sentiment.subjectivity
    fncFindAFScore = lambda x: af.score(x)
    fncFindSentiment = lambda x: "Strongly Negative" if float(x) <= -0.60 else ("Negative" if float(x) <= -0.20 else ("Neutral" if float(x) <= 0.20 else ("Positive" if float(x) <= 0.60 else "Strongly Positive")))

    tweetsDF["location_US_State"] = tweetsDF["location"].apply(fncFindStateByName)
    tweetsDF["geo_US_State"] = tweetsDF["geo"].apply(fncFindStateByCoordinates)
    tweetsDF['place_US_State'] = tweetsDF["place"].apply(fncfindStateCode)
    tweetsDF['createDate'] = (pd.to_datetime(tweetsDF["created_at"])).dt.date
    tweetsDF['clean_txt'] = ((tweetsDF['text'].apply(fncRemoveUnicode)).apply(fncRemoveJunk)).apply(fncCleanTxt)
    tweetsDF.drop(['geo', 'location', 'place','text'], axis = 1, inplace = True)
    tweetsDF['TB_polarity'] = tweetsDF['clean_txt'].apply(fncFindPolarity)
    tweetsDF['TB_subjectivity'] = tweetsDF['clean_txt'].apply(fncFindSubjectivity)
    tweetsDF['AF_polarity'] = tweetsDF['clean_txt'].apply(fncFindAFScore)
    tweetsDF['TB_sentiment'] = tweetsDF['TB_polarity'].apply(fncFindSentiment)
    tweetsDF = tweetsDF[tweetsDF['clean_txt'] != '']
    
    # Write the dataframe to csv file if the tweet originated in US
    UStweets = tweetsDF[(tweetsDF["location_US_State"] <> "") | (tweetsDF["place_US_State"] <> "") | (tweetsDF["geo_US_State"] <> "")]
    UStweets.sort_values(by = 'createDate', ascending = True, inplace = True)
    #UStweets.to_csv("CleanTweets/FP1_USTweets_" + eachFolder.split("/")[-2] + ".csv", encoding = 'utf-8', index = None)
    print ("\n\nINFO!!! Count of cleaned US tweets: " + str(UStweets.count()))
    fncWriteToFile(UStweets, "FP1_USTweets_" )
    nonUStweets = tweetsDF[(tweetsDF["location_US_State"] == "") & (tweetsDF["place_US_State"] == "") & (tweetsDF["geo_US_State"] == "")]
    nonUStweets.sort_values(by = 'createDate', ascending = True, inplace = True)
    #nonUStweets.to_csv("CleanTweets/FP1_nonUSTweets_" + eachFolder.split("/")[-2] + ".csv", encoding = 'utf-8', index = None)
    print ("\n\nINFO!!! Count of cleaned non US tweets: " + str(nonUStweets.count()) )
    fncWriteToFile(nonUStweets, "FP1_nonUSTweets_" )

In [None]:
def fncWriteToFile(tweetFile, pathPrefix):
    splitFiles = list(tweetFile.groupby('createDate'))
    for eachSplitFile in splitFiles:
        if not os.path.exists("CleanTweets/" + str(eachSplitFile[0])):
            os.makedirs("CleanTweets/" + str(eachSplitFile[0]))
        fileName = "CleanTweets/" + str(eachSplitFile[0]) + "/" + pathPrefix + str(eachSplitFile[0]) + ".csv"
        if not os.path.isfile(fileName):
            eachSplitFile[1].to_csv(fileName, encoding = 'utf-8', index = None)
        else:
            eachSplitFile[1].to_csv(fileName, encoding = 'utf-8', mode = 'a', header = False, index = None)
        print ("\n\nINFO!!! File Name: " + fileName)
        print ("\tCount of tweets: " + str(len(eachSplitFile[1])))

### Main Function  - Entry Point

In [None]:
def main():
    path = os.getcwd()
    folder = "Data/*/"
    fullpath = os.path.join(path, folder)
    list_of_folders = glob.glob(fullpath)
    for eachFolder in list_of_folders:
        fncProcessFolder(eachFolder)
    print ("\n\nINFO!!! Processing Complete")
    
if __name__ == "__main__":
    main()    



INFO!!! Processing /home/cloudera/Desktop/FP/Data/20201220/
