In [1]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("sentiment analysis training")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

path = "Sentiment Analysis Dataset.csv"
data = sqlContext.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .option("delimiter", ",")\
  .load(path)

data.cache()
data=data.dropna()
display(data)

DataFrame[ItemID: int, Sentiment: int, SentimentSource: string, SentimentText: string]

In [2]:
data.count()
sqlContext.registerDataFrameAsTable(data, "table1")
df2 = sqlContext.sql("SELECT Sentiment, count(*) from table1 group by Sentiment")
df2.show()

+---------+--------+
|Sentiment|count(1)|
+---------+--------+
|        1|  790185|
|        0|  788442|
+---------+--------+



In [3]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="SentimentText", outputCol="words")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(data)
tokenized.select("SentimentText", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=True)

+--------------------+--------------------+------+
|       SentimentText|               words|tokens|
+--------------------+--------------------+------+
|                 ...|[, , , , , , , , ...|    28|
|                 ...|[, , , , , , , , ...|    25|
|              omg...|[, , , , , , , , ...|    19|
|          .. Omga...|[, , , , , , , , ...|    36|
|         i think ...|[, , , , , , , , ...|    24|
|         or i jus...|[, , , , , , , , ...|    15|
|       Juuuuuuuuu...|[, , , , , , , ju...|     9|
|       Sunny Agai...|[, , , , , , , su...|    28|
|      handed in m...|[, , , , , , hand...|    16|
|      hmmmm.... i...|[, , , , , , hmmm...|    14|
|      I must thin...|[, , , , , , i, m...|    11|
|      thanks to a...|[, , , , , , than...|    18|
|      this weeken...|[, , , , , , this...|    12|
|     jb isnt show...|[, , , , , jb, is...|    12|
|     ok thats it ...|[, , , , , ok, th...|    10|
|    &lt;-------- ...|[, , , , &lt;----...|    13|
|    awhhe man.......|[, , , , 

In [4]:
from pyspark.ml.feature import RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="SentimentText", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(data)
regexTokenized.select("words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|words                                                                                                                                                       |tokens|
+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|[is, so, sad, for, my, apl, friend]                                                                                                                         |7     |
|[i, missed, the, new, moon, trailer]                                                                                                                        |6     |
|[omg, its, already, 7, 30, o]                                                                                                                               |6     |
|[om

In [5]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered_raw")
removed = remover.transform(regexTokenized)
removed.select("filtered_raw").show(truncate=False)

+-----------------------------------------------------------------------------------------------+
|filtered_raw                                                                                   |
+-----------------------------------------------------------------------------------------------+
|[sad, apl, friend]                                                                             |
|[missed, new, moon, trailer]                                                                   |
|[omg, already, 7, 30, o]                                                                       |
|[omgaga, im, sooo, im, gunna, cry, ve, dentist, since, 11, suposed, 2, get, crown, put, 30mins]|
|[think, mi, bf, cheating, t_t]                                                                 |
|[worry, much]                                                                                  |
|[juuuuuuuuuuuuuuuuussssst, chillin]                                                            |
|[sunny, work, tomor

In [6]:
remover_adv = StopWordsRemover(inputCol="filtered_raw", outputCol="filtered", stopWords=["","1","2","3","4","5","6","7","8","9","m","at","away","and","am","as","been","kaggle","etc","though","man","too","so","rain","shower","000","http",
                                                                                        "day", "quot","com","im","it","get","bit","see"])
removed_adv = remover_adv.transform(removed)
removed_adv.dropna()
removed_adv.select("filtered").show(truncate=False)

+-------------------------------------------------------------------------------+
|filtered                                                                       |
+-------------------------------------------------------------------------------+
|[sad, apl, friend]                                                             |
|[missed, new, moon, trailer]                                                   |
|[omg, already, 30, o]                                                          |
|[omgaga, sooo, gunna, cry, ve, dentist, since, 11, suposed, crown, put, 30mins]|
|[think, mi, bf, cheating, t_t]                                                 |
|[worry, much]                                                                  |
|[juuuuuuuuuuuuuuuuussssst, chillin]                                            |
|[sunny, work, tomorrow, tv, tonight]                                           |
|[handed, uniform, today, miss, already]                                        |
|[hmmmm, wonder,

In [7]:
all_words = removed_adv.select("filtered")
display(all_words)

DataFrame[filtered: array<string>]

In [8]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
featurizedData = hashingTF.transform(removed_adv)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

#rescaledData.select("title", "features").show()
svms = rescaledData.selectExpr("Sentiment as label", "features")
svms.dropna()
svms.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(10000,[7238,8393...|
|    0|(10000,[2415,3596...|
|    1|(10000,[419,3784,...|
|    0|(10000,[516,585,6...|
|    0|(10000,[1369,1564...|
|    0|(10000,[524,2362]...|
|    1|(10000,[1790,4209...|
|    0|(10000,[1318,7250...|
|    1|(10000,[1071,3462...|
|    1|(10000,[1583,4898...|
|    0|(10000,[1,1023,15...|
|    1|(10000,[1415,4034...|
|    0|(10000,[2786,4690...|
|    0|(10000,[2617,3976...|
|    0|(10000,[2484,7250...|
|    0|(10000,[574,3115,...|
|    0|(10000,[2131,2187...|
|    1|(10000,[263,1288,...|
|    0|(10000,[2198,3674...|
|    0|(10000,[157,263,4...|
+-----+--------------------+
only showing top 20 rows



In [9]:
from pyspark.ml.classification import NaiveBayes,NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.util import MLUtils
splits = svms.randomSplit([0.85, 0.15],12)  
train = splits[0]  
test = splits[1]  

nb = NaiveBayes(smoothing=1.0, modelType="multinomial") 
model = nb.fit(train)

result = model.transform(test)  
predictionAndLabels = result.select("prediction", "label")  
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")  

print("Accuracy: " + str(evaluator.evaluate(predictionAndLabels)))

Accuracy: 0.7381624187948265


In [10]:
class tweet_sentiment():
  
    def __init__(self, path_x):
        self.dataframe=None
        self.regexTokenized=None
        self.stop_removed=None
        self.tweet_removed_adv=None
        self.tweet_svms=None
        self.model=None
        self.positive=0
        self.negative=0
        self.path=path_x
  
    def data_loading(self):
        tweet_data = sqlContext.read.format("csv")\
        .option("header", "false")\
        .option("inferSchema", "true")\
        .option("delimiter", ",")\
        .load(self.path)
        tweet_data.cache()
        tweet_data=tweet_data.dropna()
        self.dataframe=tweet_data
        return tweet_data
  
    def nb_model(self, nb_model):
        self.model=nb_model
    
    def row_count(self):
        return self.dataframe.count()
  
    def regex_tokenize(self):
        from pyspark.ml.feature import RegexTokenizer
        from pyspark.ml.feature import Tokenizer
        from pyspark.sql.functions import col, udf
        from pyspark.sql.types import IntegerType
        tweet_regexTokenizer = RegexTokenizer(inputCol="_c2", outputCol="words", pattern="\\W")

        #tweet_countTokens = udf(lambda words: len(words), IntegerType())

        self.regexTokenized = tweet_regexTokenizer.transform(self.dataframe)
        return self.regexTokenized
    
    def regex_tokenize_show(self, truncation=True):  
        self.regexTokenized.select("words").show(truncate=truncation)
  
    def stop_words_remove(self, stop=[]):
        from pyspark.ml.feature import StopWordsRemover

        tweet_remover = StopWordsRemover(inputCol="words", outputCol="filtered_raw")
        tweet_removed = tweet_remover.transform(self.regexTokenized)
        self.stop_removed = tweet_removed
    
        tweet_remover_adv = StopWordsRemover(inputCol="filtered_raw", outputCol="filtered", stopWords=stop)
        self.tweet_removed_adv = tweet_remover_adv.transform(tweet_removed)
        self.tweet_removed_adv = self.tweet_removed_adv.dropna()
        return self.tweet_removed_adv
  
    def word2vector(self, feature_num=10000):
        from pyspark.ml.feature import HashingTF, IDF

        tweet_hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=feature_num)
        tweet_featurizedData = tweet_hashingTF.transform(self.tweet_removed_adv)

        tweet_idf = IDF(inputCol="rawFeatures", outputCol="features")
        tweet_idfModel = tweet_idf.fit(tweet_featurizedData)
        tweet_rescaledData = tweet_idfModel.transform(tweet_featurizedData)

        tweet_svms = tweet_rescaledData.selectExpr("features")
        self.tweet_svms=tweet_svms.dropna()
    
    def nb_predicting(self):
        from pyspark.ml.classification import NaiveBayes,NaiveBayesModel
        from pyspark.mllib.util import MLUtils

        self.tweet_result = self.model.transform(self.tweet_svms)  
        #tweet_predictionAndLabels = tweet_result.select("prediction")   
  
    def nb_predicting_show(self):
        self.tweet_result.select("prediction","features").show(truncate=True)
    
    def result(self):
        sqlContext.registerDataFrameAsTable(self.tweet_result, "table1")
        self.positive = sqlContext.sql("SELECT count(*) from table1 where prediction=1 group by prediction")
        self.negative = sqlContext.sql("SELECT count(*) from table1 where prediction=0 group by prediction")

In [12]:
path_list = []
import csv

states = []
with open('states.csv') as statesf:
    srows = csv.reader(statesf)
    for row in srows:
        states.append(row[0])
for state in states:
    path_list.append('Starbucks/'+state+'.csv')
path_account = len(path_list)
print("file account=", path_account)

file account= 48


In [14]:
stopWordslist2=['starbucks','Starbucks','http','https','like','have','can','drink','latte','com','get','https://www','YouTube','via','need','a','video','time',"m","at","away","and","am","as","been"]
result_dict = {}
for i in range(path_account):
    result_dict[i]={'positive':0, 'negative':0}
print(result_dict)

{0: {'positive': 0, 'negative': 0}, 1: {'positive': 0, 'negative': 0}, 2: {'positive': 0, 'negative': 0}, 3: {'positive': 0, 'negative': 0}, 4: {'positive': 0, 'negative': 0}, 5: {'positive': 0, 'negative': 0}, 6: {'positive': 0, 'negative': 0}, 7: {'positive': 0, 'negative': 0}, 8: {'positive': 0, 'negative': 0}, 9: {'positive': 0, 'negative': 0}, 10: {'positive': 0, 'negative': 0}, 11: {'positive': 0, 'negative': 0}, 12: {'positive': 0, 'negative': 0}, 13: {'positive': 0, 'negative': 0}, 14: {'positive': 0, 'negative': 0}, 15: {'positive': 0, 'negative': 0}, 16: {'positive': 0, 'negative': 0}, 17: {'positive': 0, 'negative': 0}, 18: {'positive': 0, 'negative': 0}, 19: {'positive': 0, 'negative': 0}, 20: {'positive': 0, 'negative': 0}, 21: {'positive': 0, 'negative': 0}, 22: {'positive': 0, 'negative': 0}, 23: {'positive': 0, 'negative': 0}, 24: {'positive': 0, 'negative': 0}, 25: {'positive': 0, 'negative': 0}, 26: {'positive': 0, 'negative': 0}, 27: {'positive': 0, 'negative': 0}, 2

In [15]:
test = tweet_sentiment(path_list[0])
test_df=test.data_loading()
test_df.take(20)

[Row(_c0='index', _c1='location', _c2='text'),
 Row(_c0='1', _c1='Alabama', _c2="I'm at Starbucks in Tuscaloosa, AL https://t.co/RZtAJUmymx"),
 Row(_c0='2', _c1='Alabama', _c2='Let’s see if this man will actually bring me Starbucks: a dating memoir by me'),
 Row(_c0='3', _c1='Alabama', _c2='Thought I’d drop a smile on your timeline today! Spread joy, peace and kindness today! #smile @ Starbucks https://t.co/uRz8miGPIa'),
 Row(_c0='4', _c1='Alabama', _c2='🙏🏼🙏🏼🙏🏼 https://t.co/iDI9dfLyGk'),
 Row(_c0='5', _c1='Alabama', _c2='Someone is going to #win @Starbucks for Life—who wants to try with me? #StarbucksforLife https://t.co/9TO8hytNkk'),
 Row(_c0='6', _c1='Alabama', _c2='Someone is going to win @Starbucks for Life—who wants to try with me? #StarbucksforLife https://t.co/4Q7loi9uWW'),
 Row(_c0='7', _c1='Alabama', _c2='Join the Starbucks team! See our latest #job opening here: https://t.co/Hm2zhUsGiY #BusinessMgmt #Birmingham, AL #Hiring #CareerArc'),
 Row(_c0='8', _c1='Alabama', _c2='I’m r

In [17]:
now_file = 0
while now_file<path_account:
    test = None
    test = tweet_sentiment(path_list[now_file])
    test_df=test.data_loading()
    test_df.take(2)
    #print(test.row_count())
    test_regex_tokenized = test.regex_tokenize()
    #test.regex_tokenize_show(truncation=False)
    test_stopremoved=test.stop_words_remove(stop=stopWordslist2)
    #test_stopremoved.select("filtered").show(truncate=False)
    test_svms=test.word2vector(feature_num=10000)
    #test.tweet_svms.show()
    test.nb_model(model)
    test.nb_predicting()
    test.result()
    posres=test.positive.head(1)
    postest_result = int(posres[0][0])
    negres=test.negative.head(1)
    negtest_result = int(negres[0][0])
    result_dict[now_file]={'positive':postest_result, 'negative':negtest_result}
    now_file += 1

In [18]:
print(result_dict)

{0: {'positive': 51, 'negative': 24}, 1: {'positive': 32, 'negative': 19}, 2: {'positive': 163, 'negative': 89}, 3: {'positive': 903, 'negative': 686}, 4: {'positive': 68, 'negative': 57}, 5: {'positive': 66, 'negative': 45}, 6: {'positive': 12, 'negative': 11}, 7: {'positive': 312, 'negative': 231}, 8: {'positive': 109, 'negative': 80}, 9: {'positive': 47, 'negative': 27}, 10: {'positive': 46, 'negative': 15}, 11: {'positive': 210, 'negative': 126}, 12: {'positive': 91, 'negative': 85}, 13: {'positive': 45, 'negative': 25}, 14: {'positive': 62, 'negative': 45}, 15: {'positive': 55, 'negative': 33}, 16: {'positive': 17, 'negative': 7}, 17: {'positive': 92, 'negative': 65}, 18: {'positive': 100, 'negative': 55}, 19: {'positive': 90, 'negative': 70}, 20: {'positive': 36, 'negative': 22}, 21: {'positive': 85, 'negative': 47}, 22: {'positive': 12, 'negative': 15}, 23: {'positive': 1, 'negative': 4}, 24: {'positive': 131, 'negative': 99}, 25: {'positive': 43, 'negative': 24}, 26: {'positive

In [28]:
entrys = []
c = 0
for state in states:
    en = {}
    en['states'] = state
    en['positive'] = result_dict[c]['positive']
    en['negative'] = result_dict[c]['negative']
    en['count'] = en['positive']+en['negative']
    en['ratio'] = float(en['positive'])/float(en['count'])
    entrys.append(en)
    c = c+1
print(entrys)
with open('starbucks.csv', 'w') as dstfile: 
    fieldnames = ['states','positive','negative','count','ratio']
    writer = csv.DictWriter(dstfile,fieldnames)
    writer.writeheader()
    writer.writerows(entrys)

[{'states': 'Alabama', 'positive': 51, 'negative': 24, 'count': 75, 'ratio': 0.68}, {'states': 'Arkansas', 'positive': 32, 'negative': 19, 'count': 51, 'ratio': 0.6274509803921569}, {'states': 'Arizona', 'positive': 163, 'negative': 89, 'count': 252, 'ratio': 0.6468253968253969}, {'states': 'California', 'positive': 903, 'negative': 686, 'count': 1589, 'ratio': 0.5682819383259912}, {'states': 'Colorado', 'positive': 68, 'negative': 57, 'count': 125, 'ratio': 0.544}, {'states': 'Connecticut', 'positive': 66, 'negative': 45, 'count': 111, 'ratio': 0.5945945945945946}, {'states': 'Delaware', 'positive': 12, 'negative': 11, 'count': 23, 'ratio': 0.5217391304347826}, {'states': 'Florida', 'positive': 312, 'negative': 231, 'count': 543, 'ratio': 0.574585635359116}, {'states': 'Georgia', 'positive': 109, 'negative': 80, 'count': 189, 'ratio': 0.5767195767195767}, {'states': 'Iowa', 'positive': 47, 'negative': 27, 'count': 74, 'ratio': 0.6351351351351351}, {'states': 'Idaho', 'positive': 46, '