In [None]:
!pip install pyspark

In [12]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from pyspark.conf import SparkConf
from pyspark import SparkContext, SQLContext

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import Row

import pyspark
from pyspark.sql import SparkSession

In [4]:
import time
tic = time.time()

In [5]:
sc = SparkContext.getOrCreate();
sc.stop()

In [6]:

spark = SparkSession \
    .builder \
    .appName('Klassifier') \
    .getOrCreate()

In [7]:
spark

In [13]:
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("/content/drive/My Drive/AI/out.csv"))

In [14]:
df.show(5)

+-----+--------------------+
|label|     text_root_words|
+-----+--------------------+
|  4.0|['entertainment',...|
|  1.0|['sfo', 'pdx', 's...|
|  4.0|['week', 'seat', ...|
|  0.0|['guy', 'seating'...|
|  0.0|['status', 'meet'...|
+-----+--------------------+
only showing top 5 rows



In [15]:
df.toPandas()

Unnamed: 0,label,text_root_words
0,4.0,"['entertainment', 'guest', 'face', 'amp', 'rec..."
1,1.0,"['sfo', 'pdx', 'schedule']"
2,4.0,"['week', 'seat', 'gentleman']"
3,0.0,"['guy', 'seating', 'friend', 'seat', 'internet']"
4,0.0,"['status', 'meet', 'plan', 'week', 'response']"
...,...,...
7992,4.0,"['@americanair', 'state', 'plane']"
7993,2.0,"['@americanair', 'flight', 'tomorrow', 'mornin..."
7994,1.0,"['@americanair', 'cue', 'delay', '👌']"
7995,0.0,"['minute', 'flight', 'warning', 'communication..."


In [16]:
df.columns

['label', 'text_root_words']

In [17]:
df.dtypes

[('label', 'string'), ('text_root_words', 'string')]

In [18]:
# import pyspark.sql.functions as F
# df.withColumn("new_array", F.array(F.col("negativereason_words"))).show()
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
df = df.withColumn( "text_root_words", split(col("text_root_words"), ",\s*").cast("array<string>").alias("ev"))

In [19]:
df.dtypes

[('label', 'string'), ('text_root_words', 'array<string>')]

In [20]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="text_root_words", outputCol="features", vocabSize=30000, minDF=2.0)


In [21]:
model = cv.fit(df)

In [22]:
result = model.transform(df)
result.show(truncate=False)

+-----+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|label|text_root_words                                                                      |features                                                               |
+-----+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|4.0  |[['entertainment', 'guest', 'face', 'amp', 'recourse']]                              |(2445,[13,1315,1329,1466,1719],[1.0,1.0,1.0,1.0,1.0])                  |
|1.0  |[['sfo', 'pdx', 'schedule']]                                                         |(2445,[812,840,1470],[1.0,1.0,1.0])                                    |
|4.0  |[['week', 'seat', 'gentleman']]                                                      |(2445,[21,305],[1.0,1.0])                                              |
|0.0

In [23]:
result.toPandas()

Unnamed: 0,label,text_root_words,features
0,4.0,"[['entertainment', 'guest', 'face', 'amp', 're...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1.0,"[['sfo', 'pdx', 'schedule']]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,4.0,"[['week', 'seat', 'gentleman']]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,0.0,"[['guy', 'seating', 'friend', 'seat', 'interne...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,0.0,"[['status', 'meet', 'plan', 'week', 'response']]","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
...,...,...,...
7992,4.0,"[['@americanair', 'state', 'plane']]","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7993,2.0,"[['@americanair', 'flight', 'tomorrow', 'morni...","(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7994,1.0,"[['@americanair', 'cue', 'delay', '👌']]","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
7995,0.0,"[['minute', 'flight', 'warning', 'communicatio...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."


In [24]:
result = result.drop('text_root_words')

In [25]:
from pyspark.sql.types import IntegerType
result = result.withColumn("label", result["label"].cast(IntegerType()))

In [26]:
(training_data, test_data) = result.randomSplit([0.7,0.3])

In [27]:
test_data.toPandas()

Unnamed: 0,label,features
0,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
...,...,...
2374,8,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2375,8,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2376,10,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2377,12,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


In [28]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(predictionCol = 'prediction', labelCol = 'label', maxIter = 10)
# lr = LogisticRegression(featurescol = 'features', labelcol = 'label', maxIter = 5)

In [29]:
lrModel = lr.fit(training_data)

In [30]:
predictions = lrModel.transform(test_data)
results = predictions.select(['prediction', 'label'])

In [31]:
results.show()

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       0.0|    0|
|       7.0|    0|
|       0.0|    0|
|       0.0|    0|
|       2.0|    0|
|       0.0|    0|
+----------+-----+
only showing top 20 rows



In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

from pyspark.ml.linalg import Vectors

from pyspark.ml.tuning import  ParamGridBuilder, CrossValidatorModel, CrossValidator

import tempfile

grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()

In [33]:
lr.regParam

Param(parent='LogisticRegression_6a1259785ad3', name='regParam', doc='regularization parameter (>= 0).')

In [34]:
lr = LogisticRegression(predictionCol = 'prediction', labelCol = 'label', maxIter = 10)
paramGrid = ParamGridBuilder().addGrid(cv.vocabSize, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()
my_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', predictionCol = 'prediction')

cval = CrossValidator(estimator=lr, evaluator=my_evaluator, estimatorParamMaps=paramGrid, 
                    numFolds=2)


cvModel = cval.fit(training_data)

In [35]:
newPrediction = cvModel.transform(test_data)
newPredicted = newPrediction.select("label", "probability", "prediction")
newPredicted.show()

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.44856234438250...|       0.0|
|    0|[0.82701227698072...|       0.0|
|    0|[0.36079267110899...|       0.0|
|    0|[0.59301482156807...|       0.0|
|    0|[0.95885459643101...|       0.0|
|    0|[0.30342387334485...|       2.0|
|    0|[0.91481959298901...|       0.0|
+-----+--------------------+----------+
only showing top 20 rows



In [36]:
tp = float(newPrediction.filter("prediction == 1.0 AND label == 1").count())
fp = float(newPrediction.filter("prediction == 1.0 AND label == 0").count())
tn = float(newPrediction.filter("prediction == 0.0 AND label == 0").count())
fn = float(newPrediction.filter("prediction == 0.0 AND label == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
ac = (tp+tn)/(tp + tn + fn + fp)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("accuracy", ac),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|             244.0|
|       FP|              78.0|
|       TN|             596.0|
|       FN|             120.0|
|Precision|0.7577639751552795|
|   Recall|0.6703296703296703|
| accuracy|0.8092485549132948|
|       F1|0.7113702623906705|
+---------+------------------+



In [37]:
toc = time.time()
print("time(s): ", ((toc-tic)))

time(s):  443.8780872821808
