In [0]:
!pip install nltk

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,NaiveBayes,RandomForestClassifier
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.sql.functions import udf, col, lower, trim, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer # BE SURE NLTK IS INSTALLED ON THE CLUSTER USING THE "LIBRARIES" TAB IN THE MENU


# Importing the feature transformation classes for doing TF-IDF 
from pyspark.ml.feature import HashingTF, CountVectorizer, IDF, NGram
from pyspark.ml import Pipeline

from pyspark.ml.evaluation import MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.mllib.evaluation import MulticlassMetrics

import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score
import matplotlib.pyplot as plt


In [0]:
#obje oluşturmak için sparksession.
spark = SparkSession \
    .builder \
    .appName("Spark ML example on twitter data ") \
    .getOrCreate()

In [0]:
RANDOM_SEED = 42 # used below to run the actual K-means clustering
VOCAB_SIZE = 1000 # number of words to be retained as vocabulary
MIN_DOC_FREQ = 10 # minimum number of documents a word has to appear in to be included in the vocabulary
N_GRAMS = 2 # number of n-grams (if needed)
N_FEATURES = 200 # default embedding vector size (if HashingTF or, later, Word2Vec are used)

In [0]:
# CSV dosyasını okuma.
#dataset = "/FileStore/tables/testdata_manual_2009_06_14-4.csv"
dataset = "dbfs:/datasets/train.csv"
df = spark.read.csv(dataset,header = 'False',inferSchema='True')

In [0]:
df = df.select(col("_c0").alias("polarite"), col("_c1").alias("id"), col("_c2").alias("date"), col("_c3").alias("query"), col("_c4").alias("user_name"), col("_c5").alias("tweet"))
df.show()

+--------+-------------+--------------------+--------+---------------+--------------------+
|polarite|           id|                date|   query|      user_name|               tweet|
+--------+-------------+--------------------+--------+---------------+--------------------+
|     0.0|1.467810369E9|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0.0|1.467810672E9|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0.0|1.467810917E9|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0.0|1.467811184E9|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0.0|1.467811193E9|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0.0|1.467811372E9|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0.0|1.467811592E9|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|          Need a hug|
|     0.0|1.467811594E9|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish 

In [0]:
from pyspark.sql.functions import when
df = df.withColumn("polarite", when(df.polarite == "0.0","0") \
      .when(df.polarite == "4.0","1") \
      .otherwise(df.polarite))

In [0]:
df.groupBy("polarite").count().show()

+--------+------+
|polarite| count|
+--------+------+
|       0|784103|
|       1|778833|
+--------+------+



In [0]:
# Import the required packages
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
stages = []
# 1. clean data and tokenize sentences using RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="tweet", outputCol="tokens", pattern="\\W+")
stages += [regexTokenizer]

# 2. CountVectorize the data
cv = CountVectorizer(inputCol="tokens", outputCol="token_features", minDF=2.0)#, vocabSize=3, minDF=2.0
stages += [cv]

# 3. Convert the labels to numerical values using binariser
#indexer = StringIndexer(inputCol="polarite", outputCol="label")
#stages += [indexer]

# 4. Vectorise features using vectorassembler
vecAssembler = VectorAssembler(inputCols=['token_features'], outputCol="features")
stages += [vecAssembler]

[print('\n', stage) for stage in stages]


 RegexTokenizer_7cf3c5ff2f2f

 CountVectorizer_d1dbaecc9eec

 VectorAssembler_bc07b02ed115
Out[11]: [None, None, None]

In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
data = pipeline.fit(df).transform(df)

In [0]:
data.printSchema()
from pyspark.sql.types import IntegerType,DoubleType
data = data.withColumn("polarite", data["polarite"].cast(DoubleType()))
data.printSchema()

root
 |-- polarite: string (nullable = true)
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- token_features: vector (nullable = true)
 |-- features: vector (nullable = true)

root
 |-- polarite: double (nullable = true)
 |-- id: double (nullable = true)
 |-- date: string (nullable = true)
 |-- query: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- token_features: vector (nullable = true)
 |-- features: vector (nullable = true)



In [0]:
train, test = data.randomSplit([0.7, 0.3], seed = 2018)

In [0]:
from pyspark.ml.classification import NaiveBayes
# Initialise the model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",labelCol='polarite')
# Fit the model
model = nb.fit(train)
# Make predictions on test data
predictions = model.transform(test)
predictions.select("polarite", "prediction", "probability").show()

+--------+----------+--------------------+
|polarite|prediction|         probability|
+--------+----------+--------------------+
|     0.0|       1.0|[0.29702795399708...|
|     0.0|       0.0|[0.95543756293950...|
|     0.0|       0.0|[0.51273007935153...|
|     0.0|       0.0|[0.99238278637524...|
|     0.0|       0.0|[0.93829387451755...|
|     0.0|       1.0|[0.13846088455383...|
|     0.0|       0.0|[0.93499527813530...|
|     0.0|       0.0|[0.88535992522793...|
|     0.0|       0.0|[0.97514361411782...|
|     0.0|       0.0|[0.78557214430375...|
|     0.0|       0.0|[0.99968588054449...|
|     0.0|       0.0|[0.98979135891167...|
|     0.0|       0.0|[0.99999998950176...|
|     0.0|       0.0|[0.99355554072012...|
|     0.0|       0.0|[0.96176367607767...|
|     0.0|       0.0|[0.88173510171205...|
|     0.0|       0.0|[0.93184230782056...|
|     0.0|       0.0|[0.63369391428442...|
|     0.0|       1.0|[0.34513447279035...|
|     0.0|       1.0|[0.35161001377571...|
+--------+-

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="polarite", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print ("Model Accuracy: ", accuracy)

Model Accuracy:  0.7822402467375877


In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="polarite", predictionCol="prediction")
predictionFinal_rf_g = predictions.select(
    "tweet", "prediction", "polarite")
predictionFinal_rf_g.show(n=4, truncate = False)
correctPrediction_rf_g = predictionFinal_rf_g.filter(
    predictionFinal_rf_g['prediction'] == predictionFinal_rf_g['polarite']).count()
totalData_rf_g = predictionFinal_rf_g.count()
print("correct prediction:", correctPrediction_rf_g, ", total data:", totalData_rf_g, 
      ", accuracy:", correctPrediction_rf_g/totalData_rf_g)



preds_and_labels_rf_g = predictions.select(['prediction','polarite'])

metrics_rf_g = MulticlassMetrics(preds_and_labels_rf_g.rdd.map(tuple))

precision_0_rf_g = metrics_rf_g.precision(0.0)
recall_0_rf_g = metrics_rf_g.recall(0.0)
precision_4_rf_g = metrics_rf_g.precision(1.0)
recall_4_rf_g = metrics_rf_g.recall(1.0)
f1Score_rf_g = metrics_rf_g.fMeasure(0.0,1.0)
print("Summary Stats")
print("Precision 0 = %s" % precision_0_rf_g)
print("Recall 0 = %s" % recall_0_rf_g)
print("Precision 4 = %s" % precision_4_rf_g)
print("Recall 4 = %s" % recall_4_rf_g)
print("F1 Score = %s" % f1Score_rf_g)
print(metrics_rf_g.confusionMatrix().toArray())

+-------------------------------------------------------------------------------------------------------------------+----------+--------+
|tweet                                                                                                              |prediction|polarite|
+-------------------------------------------------------------------------------------------------------------------+----------+--------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|1.0       |0.0     |
|@Tatiana_K nope they didn't have it                                                                                |0.0       |0.0     |
|I just re-pierced my ears                                                                                          |0.0       |0.0     |
|@caregiving I couldn't bear to watch it.  And I thought the UA loss was embarrassing . . . . .                     |0.0       |0.0     |
+---------------------------------

In [0]:
nbparamGrid = (ParamGridBuilder()
               .addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
               .build())


ml_class = MulticlassClassificationEvaluator(labelCol="polarite", predictionCol="prediction")
crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=nbparamGrid,
                          evaluator=ml_class,
                          numFolds=2)

cvModel = crossval.fit(train)

predictions = cvModel.transform(test)



In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="polarite", predictionCol="prediction")
predictionFinal_rf_g = predictions.select(
    "tweet", "prediction", "polarite")
predictionFinal_rf_g.show(n=4, truncate = False)
correctPrediction_rf_g = predictionFinal_rf_g.filter(
    predictionFinal_rf_g['prediction'] == predictionFinal_rf_g['polarite']).count()
totalData_rf_g = predictionFinal_rf_g.count()
print("correct prediction:", correctPrediction_rf_g, ", total data:", totalData_rf_g, 
      ", accuracy:", correctPrediction_rf_g/totalData_rf_g)



preds_and_labels_rf_g = predictions.select(['prediction','polarite'])

metrics_rf_g = MulticlassMetrics(preds_and_labels_rf_g.rdd.map(tuple))

precision_0_rf_g = metrics_rf_g.precision(0.0)
recall_0_rf_g = metrics_rf_g.recall(0.0)
precision_4_rf_g = metrics_rf_g.precision(1.0)
recall_4_rf_g = metrics_rf_g.recall(1.0)
f1Score_rf_g = metrics_rf_g.fMeasure(0.0,1.0)
print("Summary Stats")
print("Precision 0 = %s" % precision_0_rf_g)
print("Recall 0 = %s" % recall_0_rf_g)
print("Precision 4 = %s" % precision_4_rf_g)
print("Recall 4 = %s" % recall_4_rf_g)
print("F1 Score = %s" % f1Score_rf_g)
print(metrics_rf_g.confusionMatrix().toArray())

+-------------------------------------------------------------------------------------------------------------------+----------+--------+
|tweet                                                                                                              |prediction|polarite|
+-------------------------------------------------------------------------------------------------------------------+----------+--------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|1.0       |0.0     |
|@Tatiana_K nope they didn't have it                                                                                |0.0       |0.0     |
|I just re-pierced my ears                                                                                          |0.0       |0.0     |
|@caregiving I couldn't bear to watch it.  And I thought the UA loss was embarrassing . . . . .                     |0.0       |0.0     |
+---------------------------------