In [1]:
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType,FloatType
from pyspark.sql.functions import isnan, when, count, col

from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk import pos_tag
import string
import re
!pip install -U textblob
from textblob import TextBlob
import time
import matplotlib.pyplot as plt
import numpy as np



import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

Collecting textblob
[?25l  Downloading https://files.pythonhosted.org/packages/60/f0/1d9bfcc8ee6b83472ec571406bd0dd51c0e6330ff1a51b2d29861d389e85/textblob-0.15.3-py2.py3-none-any.whl (636kB)
[K    100% |████████████████████████████████| 645kB 27.5MB/s ta 0:00:01
[?25hRequirement not upgraded as not directly required: nltk>=3.1 in /home/ec2-user/anaconda3/envs/chainer_p36/lib/python3.6/site-packages (from textblob) (3.3)
Requirement not upgraded as not directly required: six in /home/ec2-user/anaconda3/envs/chainer_p36/lib/python3.6/site-packages (from nltk>=3.1->textblob) (1.14.0)
Installing collected packages: textblob
Successfully installed textblob-0.15.3
[33mYou are using pip version 10.0.1, however version 20.1b1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [3]:
# get my execution role as defined based on my IAM policy
role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).master("local[*]").getOrCreate()

# start SparkSession
spark

In [6]:
region = boto3.Session().region_name
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.{}.amazonaws.com'.format(region))

df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true',inferschema='true').\
                                load('s3a://twitter-bucket-jingyusu/stream Covid-19 4-09.csv')
drop_list = ['place']
df = df.select([column for column in df.columns if column not in drop_list])
df = df.dropna()
df.show(5)

+-------------------+-------------+-------------------+---------------+--------------------+
|         twitter_id|         name|         created_at|followers_count|                text|
+-------------------+-------------+-------------------+---------------+--------------------+
|1248037911578148866|     OnTopMag|2020-04-09 00:00:00|           5038|'Fauci: Pandemic ...|
|1248037911532113928|  RedMaryland|2020-04-09 00:00:00|           5062|'Red Maryland is ...|
|1248037911557218304|   PEDro_CEBP|2020-04-09 00:00:00|           9659|'MindSpot has rel...|
|1248037911582232576|CheckPointOrg|2020-04-09 00:00:00|           5379|'Staying social o...|
|1248037911921963008|salvationarmy|2020-04-09 00:00:00|          63374|'The Salvation Ar...|
+-------------------+-------------+-------------------+---------------+--------------------+
only showing top 5 rows



In [7]:
import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer
tok = WordPunctTokenizer()

pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
pat3 = r'^.RT'
hashtag = r'\#\S*'
combined_pat = r'|'.join((pat1, pat2, pat3))
print(combined_pat)
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

def tweet_cleaner_updated(text):
    soup = BeautifulSoup(text, 'lxml')
    souped = soup.get_text()
    try:
        bom_removed = souped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        bom_removed = souped
    stripped = re.sub(combined_pat, '', bom_removed)
    stripped = re.sub(www_pat, '', stripped)
    lower_case = stripped.lower()
    neg_handled = neg_pattern.sub(lambda x: negations_dic[x.group()], lower_case)
    letters_only = re.sub("[^a-zA-Z19]", " ", neg_handled)
    # During the letters_only process two lines above, it has created unnecessay white spaces,
    # I will tokenize and join together to remove unneccessary white spaces
    words = [x for x  in tok.tokenize(letters_only) if len(x) > 1]
    return (" ".join(words)).strip()
text_process_udf = udf(tweet_cleaner_updated, StringType())

def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity
sentiment_analysis_udf = udf(sentiment_analysis , FloatType())

def condition(r):
    if (r >=0.1):
        label = "positive"
    elif(r <= -0.1):
        label = "negative"
    else:
        label = "neutral"
    return label

sentiment_udf = udf(lambda x: condition(x), StringType())


@[A-Za-z0-9_]+|https?://[^ ]+|^.RT


In [8]:
# process the test to analyis the sentiment.
df = df.withColumn('text_processed',text_process_udf(df['text']))
df  = df.withColumn("sentiment_score", sentiment_analysis_udf( df['text_processed'] ))
df  = df.withColumn("sentiment", sentiment_udf( df['sentiment_score'] ))
df.show(5,True)

+-------------------+-------------+-------------------+---------------+--------------------+--------------------+---------------+---------+
|         twitter_id|         name|         created_at|followers_count|                text|      text_processed|sentiment_score|sentiment|
+-------------------+-------------+-------------------+---------------+--------------------+--------------------+---------------+---------+
|1248037911578148866|     OnTopMag|2020-04-09 00:00:00|           5038|'Fauci: Pandemic ...|fauci pandemic ha...|     0.40833333| positive|
|1248037911532113928|  RedMaryland|2020-04-09 00:00:00|           5062|'Red Maryland is ...|red maryland is l...|     0.06818182|  neutral|
|1248037911557218304|   PEDro_CEBP|2020-04-09 00:00:00|           9659|'MindSpot has rel...|mindspot has rele...|    0.016666668|  neutral|
|1248037911582232576|CheckPointOrg|2020-04-09 00:00:00|           5379|'Staying social o...|staying social on...|     0.24242425| positive|
|1248037911921963008

In [9]:
# analysis the sentiment
grouped = df.groupby("sentiment").count()
result = grouped.withColumn('percent', (grouped['count']/df.count()) * 100).orderBy("sentiment")
a = result.select('percent').collect()
ans = [i[0] for i in a]
ans

[12.436854551114363, 50.657804914012964, 36.90534053487267]

In [11]:
# functionize the code
def get_sentiemnt(file):
    df = spark.read.format('com.databricks.spark.csv').\
            options(header='true',inferschema='true').\
            load(file)
    drop_list = ['place']
    df = df.select([column for column in df.columns if column not in drop_list])
    df = df.dropna()
    df = df.withColumn('text_processed',text_process_udf(df['text']))
    df  = df.withColumn("sentiment_score", sentiment_analysis_udf( df['text_processed'] ))
    df  = df.withColumn("sentiment", sentiment_udf( df['sentiment_score'] ))
    grouped = df.groupby("sentiment").count()
    result = grouped.withColumn('percent', (grouped['count']/df.count()) * 100).orderBy("sentiment")
    ans = result.select('percent').collect()
    return [i[0] for i in ans]

In [12]:
s3 = boto3.Session().resource('s3')
your_bucket = s3.Bucket('twitter-bucket-jingyusu')

filelist = []
for s3_file in your_bucket.objects.all():
    if 'tweets' not in s3_file.key:
        filelist.append(s3_file.key)
filelist.sort()

In [13]:
filelist

['stream Covid-19 4-08.csv',
 'stream Covid-19 4-09.csv',
 'stream Covid-19 4-12.csv',
 'stream Covid-19 4-13.csv',
 'stream Covid-19 4-14.csv',
 'stream Covid-19 4-15.csv',
 'stream Covid-19 4-16.csv',
 'stream Covid-19 4-17.csv',
 'stream Covid-19 4-18.csv',
 'stream Covid-19 4-19.csv',
 'stream Covid-19 4-20.csv',
 'stream Covid-19 4-21.csv',
 'stream Covid-19 4-22.csv']

In [None]:
import time
start_time = time.time()
result_list = []
for file in filelist:
    file_name = 's3a://twitter-bucket-jingyusu/' + file
    result_list.append(get_sentiemnt(file_name))
print("--- %s seconds ---" % (time.time() - start_time))# main loop to process all the data:

In [None]:
plt.plot(np.arange(len(result_list)), [i[0] for i in result_list],label='negative ')
plt.plot(np.arange(len(result_list)), [i[1] for i in result_list],label='neutral')
plt.plot(np.arange(len(result_list)), [i[2] for i in result_list],label='positive')
ax.set_xticklabels([i.split(' ')[-1].split('.')[0] for i in filelist])
ax.legend()
