In [1]:
import pyspark

In [2]:
tw_data = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").option("mode", "DROPMALFORMED").load("/FileStore/tables/Sentiment_Analysis_Dataset-08265.csv")

In [3]:
tw_data.printSchema()

In [4]:
tw_data.show(1)

In [5]:
tw_data.describe("Sentiment").show()

In [6]:
from pyspark.sql.functions import col

#Get the count of the labels

tw_data.groupBy("Sentiment")\
.count()\
.orderBy(col("count").desc())\
.show()

tw_data.createOrReplaceTempView("tweets")

In [7]:
#Count the frequency of words

import pyspark.sql.functions as func

words = tw_data\
        .select('SentimentText',func.explode(func.split('SentimentText','[\W_]+')).alias('word'))\
        .where(func.length('word')>0)\
        .select('SentimentText',func.trim(func.lower(func.col('word'))).alias('word'))\

words.show()

In [8]:
counts = words.groupBy("word")\
          .count()\
          .orderBy("count",ascending =False)\
          

counts.createOrReplaceTempView("counts_sql")

In [9]:
%sql
select * from counts_sql limit 20

word,count
i,953107
to,561754
the,523059
a,381151
my,314029
it,304196
and,302799
you,300011
is,236894
in,215858


In [10]:
%sql
select Sentiment,count(Sentiment) as Count from tweets group by Sentiment order by Sentiment

Sentiment,Count
0,788442
1,790185


In [11]:
%sql
select SentimentSource, count(SentimentSource) as count from tweets group by SentimentSource order by SentimentSource

SentimentSource,count
Kaggle,1349
Sentiment140,1577278


In [12]:
#Remove the columns we do not need

drop_list = ['ItemID','SentimentSource']
tw_data = tw_data.select([column for column in tw_data.columns if column not in drop_list])
tw_data.show(5)

In [13]:
tw_data = tw_data.selectExpr("Sentiment as label","SentimentText as SentimentText")

In [14]:
tw_data.show(5)

In [15]:
(trainingData, testData) = tw_data.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

#Remove regular expressions
regexTokenizer = RegexTokenizer(inputCol="SentimentText", outputCol="words", pattern="\\W")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")

#Create feature vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)

#Define the logistic regression lodel
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

#Create a pipeline
pipeline_lr = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF,idf,lr])

# Create ParamGrid for Cross Validation
paramGrid_lr = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
             .build())

# Create 5-fold CrossValidator
cv_lr = CrossValidator(estimator=pipeline_lr, \
                    estimatorParamMaps=paramGrid_lr, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel_lr = cv_lr.fit(trainingData)

lr_predictions = cvModel_lr.transform(testData)

accuracy_lr = evaluator.evaluate(lr_predictions)




In [17]:
print("The accuracy of Logistic Regression: %f%s" %(accuracy_lr*100,"%"))


In [18]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1.0)

#Create a pipeline
pipeline_nb = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF,idf,nb])


# Create ParamGrid for Cross Validation
paramGrid_nb = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()
 

# Create 5-fold CrossValidator
cv_nb = CrossValidator(estimator=pipeline_nb, \
                    estimatorParamMaps=paramGrid_nb, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel_nb = cv_nb.fit(trainingData)

nb_predictions = cvModel_nb.transform(testData)

accuracy_nb = evaluator.evaluate(nb_predictions)


In [19]:
print("The accuracy of Naive Bayes: %f%s" %(accuracy_nb*100,"%"))