In [2]:
import os
os.environ["PYSPARK_PYTHON"]="python3.7"
os.environ["PYSPARK_DRIVER_PYTHON"]="python3.7"

In [3]:
!pip install numpy

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [4]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
            .master("spark://spark-master:7077") \
            .appName("Sentiment") \
            .getOrCreate()

In [5]:
df = spark.read.format("csv").options(header='true').load("hdfs://namenode/user/root/input/data_sentiment.csv")

In [6]:
df.printSchema()

root
 |-- comment: string (nullable = true)
 |-- sentiment: string (nullable = true)



In [7]:
df = df.na.drop().dropDuplicates()

In [8]:
df.groupBy("sentiment") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(30)

+------------+-------+
|   sentiment|  count|
+------------+-------+
|           1|1694782|
|          -1| 421948|
|           0| 403621|
|            |    261|
|     however|     51|
|          ,1|     32|
|        but |     20|
|      though|     17|
|         but|     16|
|          I |     16|
|    though."|     15|
|       etc."|     14|
|        the |     14|
| thank you."|     13|
|    thanks."|     12|
|         say|     12|
|          so|     11|
|        and |     11|
|          ,0|     11|
|         it |     11|
|    thanks!"|     10|
|         so |     10|
| for example|     10|
|           2|      9|
|          )"|      9|
|       too."|      9|
|         too|      9|
|      please|      8|
|        well|      8|
|     in fact|      8|
+------------+-------+
only showing top 30 rows



In [9]:
df = df.filter((df.sentiment=="1") | (df.sentiment=="0") | (df.sentiment=="-1"))

In [10]:
df.groupBy("sentiment") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(30)

+---------+-------+
|sentiment|  count|
+---------+-------+
|        1|1694782|
|       -1| 421948|
|        0| 403621|
+---------+-------+



In [11]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="words", pattern="\\W")

# default stop words 
# StopWordsRemover.loadDefaultStopWords(language='english')

# stop words
add_stopwords = [] # standard stop words

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")

In [13]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)
dataset = pipelineFit.transform(df)

In [14]:
dataset.show(5)

+--------------------+---------+--------------------+--------------------+--------------------+-----+
|             comment|sentiment|               words|            filtered|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
|! I Multisoft ® C...|        0|[i, multisoft, co...|[i, multisoft, co...|(10000,[1,2,4533,...|  2.0|
|"""Too Many Reque...|        0|[too, many, reque...|[too, many, reque...|(10000,[1,2,5,6,8...|  2.0|
|"""VERY NICE GAME...|        1|  [very, nice, game]|  [very, nice, game]|(10000,[12,18,63]...|  0.0|
|"*Update* so afte...|       -1|[update, so, afte...|[update, so, afte...|(10000,[0,1,2,4,5...|  1.0|
|"Absolutly fantas...|        1|[absolutly, fanta...|[absolutly, fanta...|(10000,[0,1,3,4,5...|  0.0|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [15]:
### Randomly split data into training and test sets. set seed for reproducibility

(trainData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

In [24]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1)

# train the model
model = nb.fit(trainData)

In [25]:
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("comment","sentiment","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+---------+------------------------------+-----+----------+
|                       comment|sentiment|                   probability|label|prediction|
+------------------------------+---------+------------------------------+-----+----------+
|Logo Maker Free is really v...|        1|[1.0,1.1070201464326285E-16...|  0.0|       0.0|
|Lovely tuner! Has made my t...|        1|[1.0,9.403216330527206E-17,...|  0.0|       0.0|
|Mortgage calculator is very...|        1|[1.0,9.145492464495233E-17,...|  0.0|       0.0|
|Loving the look of the upda...|        1|[1.0,9.111928430989751E-17,...|  0.0|       0.0|
|If you're thorough like me,...|        1|[1.0,8.478961767447042E-17,...|  0.0|       0.0|
|This app is very straightfo...|        1|[1.0,8.378463138859317E-17,...|  0.0|       0.0|
|Registered recently and hav...|        1|[1.0,7.953679798030606E-17,...|  0.0|       0.0|
|The best icon pack ever cre...|        1|[1.0,7.61943407873422E-17,4...|  0.0|       0.0|

In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7641344028646035

In [16]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [17]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)