In [1]:
import matplotlib.pyplot as plt
import numpy as np
import importlib
import Scsv


In [2]:
# %matplotlib widget

In [3]:
%matplotlib

Using matplotlib backend: agg


In [4]:
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import numpy as np
import sys
from operator import add

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import StopWordsRemover

In [5]:
USE_STOP_LIST=False
LAYERS=100
# MASTER= "spark://virtual01-virtualbox:7077"
MASTER= "local[*]"


In [6]:

spark = SparkSession.builder\
    .appName("MyProj")\
    .master(MASTER)\
    .config("spark.sql.repl.eagerEval.enabled", True)\
    .getOrCreate()

fname="hdfs://zuk:9000/dataset/twitter/Tweets.csv"

In [7]:
df = spark.read.option("header",True).csv(fname)\
    .select("airline_sentiment","text")

rter = RegexTokenizer(inputCol="text", outputCol="token", pattern="[\\W]+", minTokenLength=3)
tokenized = rter.transform(df.na.fill(' '))

In [8]:
more_stopwords=["virginamerica","https","ual","nyc"]

if USE_STOP_LIST==True:
    more_stopwords.extend(StopWordsRemover().getStopWords())

remover = StopWordsRemover(inputCol="token", outputCol="rtoken").setStopWords(more_stopwords)
removed = remover.transform(tokenized)

removed.count()

14837

In [9]:
removed.select("airline_sentiment","rtoken").limit(10)

airline_sentiment,rtoken
neutral,"[what, dhepburn, ..."
positive,"[plus, you, added..."
neutral,"[didn, today, mus..."
negative,"[really, aggressi..."
negative,"[and, really, big..."
negative,"[seriously, would..."
,[]
positive,"[yes, nearly, eve..."
neutral,"[really, missed, ..."
positive,"[well, didn, but,..."


In [12]:
oname = fname.replace("/", "").replace(":", "")
# !transform data

word2Vec = Word2Vec(vectorSize=LAYERS, minCount=2,
                    inputCol="rtoken", outputCol="vector")

model = word2Vec.fit(removed)


result = model.transform(removed)

In [13]:
model.findSynonyms("rain",10)


word,similarity
dozen,0.9289740324020386
jeff,0.925881028175354
military,0.9245061874389648
expensive,0.9239490628242492
coworker,0.9230392575263976
fire,0.9224805235862732
local,0.92110937833786
reference,0.920864462852478
biggest,0.9202502369880676
codeshare,0.9200882911682128


In [14]:
result.printSchema()
result.count()

root
 |-- airline_sentiment: string (nullable = false)
 |-- text: string (nullable = false)
 |-- token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rtoken: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- vector: vector (nullable = true)



14837

In [15]:
from pyspark.sql.functions import col
df1 = df.filter("airline_sentiment = \"positive\" OR airline_sentiment = \"neutral\" OR airline_sentiment = \"negative\"")
# unknown sentiment values are removed

df1.groupBy("airline_sentiment")\
    .count()\
    .orderBy(col("count").desc())\
    .show()

+-----------------+-----+
|airline_sentiment|count|
+-----------------+-----+
|         negative| 9178|
|          neutral| 3099|
|         positive| 2363|
+-----------------+-----+



In [16]:
# pipelined, very simplified

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

df2 =  df1.na.fill("")
label_stringIdx = StringIndexer(inputCol="airline_sentiment", outputCol="label")

pipeline=Pipeline(stages = [rter, remover, word2Vec,label_stringIdx])

pipelineFit = pipeline.fit(df2)

df3 = pipelineFit.transform(df2)

In [17]:
# training (multi-class-text-classification-with-pyspark)
# ref: https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35

(trainingDf, testDf) = df3.randomSplit([0.75,0.25], seed=100)

print(f"train = {trainingDf.count()}, test count= {testDf.count()}")

train = 11041, test count= 3599


In [16]:
# lr classifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,
                       featuresCol="vector")
lrModel=lr.fit(trainingDf)

predictions = lrModel.transform(testDf)



predictions.filter(predictions['prediction'] == 0) \
    .orderBy("probability", ascending=False) \
    .limit( 10)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6584102491656166

In [22]:
# random forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="vector", \
                            numTrees = 100)# Train model with Training Data
rfModel = rf.fit(trainingDf)

predictions = rfModel.transform(testDf)

predictions.filter(predictions['prediction'] == 0) \
    .orderBy("probability", ascending=False) \
    .limit(10)


evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6471128153876217

In [18]:
# perceptron classifier
# ref: https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
#layers = [4, 5, 4, 3]
layers = [LAYERS,60,17, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234, featuresCol="vector")

# train the model
pmodel = trainer.fit(trainingDf)

# compute accuracy on the test set
presult = pmodel.transform(testDf)
predictionAndLabels = presult.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.7188107807724368


In [19]:
correctRows = presult.filter(presult.label==presult.prediction)

correctRows.groupBy("label")\
    .count()\
    .orderBy(col("count").desc())\
    .show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0| 2079|
|  2.0|  294|
|  1.0|  214|
+-----+-----+

