In [1]:
#Importing libraries
import os
import re
import string
import tweepy as tw
import pandas as pd
import numpy as np
import findspark
import warnings
import nltk
import pyspark as ps
from nltk.corpus import wordnet
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
findspark.init()

#Import PySpark Tokenizer and Stopword Remover
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

#Get Bing Liu Dictionary, wordnet
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('omw')
nltk.download('opinion_lexicon')

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\chait\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\chait\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package omw to
[nltk_data]     C:\Users\chait\AppData\Roaming\nltk_data...
[nltk_data]   Package omw is already up-to-date!
[nltk_data] Downloading package opinion_lexicon to
[nltk_data]     C:\Users\chait\AppData\Roaming\nltk_data...
[nltk_data]   Package opinion_lexicon is already up-to-date!


True

In [2]:
############### Step 1 - Create Twitter Developer account and get all the keys and access tokens ###############
#Twitter Authentication Keys
consumer_key= '#################################'
consumer_secret= '#############################################'
access_token= '#############################################'
access_token_secret= '######################################################'

#Accessing twitter API
auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tw.API(auth, wait_on_rate_limit=True)

############### Step 2 - Get 1000 tweets ###############
# Collect 1000 tweets
date_since = "2020-10-10"
new_search = "#hospital" + " -filter:retweets"
tweets = tw.Cursor(api.search, q=new_search, lang="en", since=date_since).items(1000)

#convert iterable tweet object to list
results = []
for tweet in tweets : results.append(tweet)    

# create SparkContext
try:
    sc = ps.SparkContext('local[4]')
    sqlContext = SQLContext(sc)
    print("Just created a SparkContext")
except ValueError:
    warnings.warn("SparkContext already exists in this scope")

dfTweetData = pd.DataFrame()
dfTweetData['tweetID'] = [tweet.id for tweet in results]
dfTweetData['tweetText'] = [tweet.text for tweet in results]
#dfTweetData

#create pyspark dataframe
spark_df = sqlContext.createDataFrame(dfTweetData)
#spark_df.show()

#Removing ascii characters
def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)
spark_df_ascii = spark_df.withColumn("tweetText_new", ascii_udf('tweetText'))

############### Step 3 - Data Cleaning, Tokenizing, Sentiment Analysis ###############

#Text Cleaning 
df_clean=spark_df_ascii.withColumn('tweetText_new', regexp_replace('tweetText_new', "http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", "").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new', "[-_]", " ").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new', "[\"]", "").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new', "[0-9]", "").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new', "[&$#@,:;.+!%/)(*?|]", "").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new',  "'", "").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new',  "\n", " ").alias("replaced")).withColumn('tweetText_new', regexp_replace('tweetText_new',  " +", " ").alias("replaced"))

#Triming trailing spaces
df_trim=df_clean.withColumn('tweetText_new', trim(df_clean.tweetText_new))

#lower case
df_lowercase = df_trim.select("*", lower(col('tweetText_new')))

#drop intermediate columns and rename an existing column
dfFinal = df_lowercase.drop('tweetText_new').withColumnRenamed("lower(tweetText_new)","cleaned_tweet_text")

#Tokenize the tweets
tokenizer = Tokenizer(inputCol = "cleaned_tweet_text", outputCol = "tokenized_words")
tokenizeTweetData = tokenizer.transform(dfFinal)
#tokenizeTweetData.show()

#remove stop words
swr = StopWordsRemover(inputCol = tokenizer.getOutputCol(), outputCol = "meaningful_words")
SWRemovedTweets = swr.transform(tokenizeTweetData)
#SWRemovedTweets.show()
FinalData = SWRemovedTweets.select("tweetID","tweetText","meaningful_words")


#Import  positive and negative list from Bing Liu dictionary
from nltk.corpus import opinion_lexicon
pos_list=list(set(opinion_lexicon.positive()))
neg_list=list(set(opinion_lexicon.negative()))

#Lemmatizer with POS tag
def get_pos(word):
    postag = nltk.pos_tag([word])[0][1][0].upper()
    postag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}
    return postag_dict.get(postag, wordnet.NOUN)


def sentiment(words):
    lemmatizer = WordNetLemmatizer()
    senti=0
    for word in words:
        # initialize lemmatizer
        lemma_word = lemmatizer.lemmatize(word, get_pos(word))
        #calculate total score by positive and negative sentiment
        if lemma_word in pos_list:
            senti += 1
        elif lemma_word in neg_list:
            senti -= 1
    return senti


Just created a SparkContext


In [4]:
############### Step 4 - Export Tweets and Sentiment Score to CSV ###############
dfTweetScore = FinalData.select("*").toPandas()
dfTweetScore['Sentiment_Score'] = dfTweetScore['meaningful_words'].apply(sentiment)

#Dropping and renmaing columns
dfTweetScore = dfTweetScore.drop (['tweetID','meaningful_words'],axis=1)
dfFinal = dfTweetScore.rename(columns={'tweetText': 'Tweet_content'})

#write result to csv
#dfFinal.to_csv(r'C:\Users\nikit\Desktop\IDS_561_BigData\HW3\Final_Code\HW3_FinalResult.csv',index = False)
dfFinal.to_csv(r'C:\Users\chait\Desktop\561- Big Data\Assignment 3\HW3_FinalResult.csv',index = False)