Required Libraries.

In [None]:
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [54]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import Tokenizer, Word2Vec
from tqdm import tqdm
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier, DecisionTreeClassifier 
from pyspark.ml.classification import NaiveBayes

In [10]:
from pyspark.ml.feature import OneHotEncoder

Connection with Mysql - Loading Dataset.

In [3]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

dataframe_mysql = spark.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/twitter_analysis",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "tweets",
    user="root",
    password="8056").load()

In [77]:
dataframe_mysql.columns

['id', 'Tweet', 'Sentiment']

In [78]:
dataframe_mysql.count()

3260

In [5]:
dataframe_mysql.show(5)

+---+--------------------+---------+
| id|               Tweet|Sentiment|
+---+--------------------+---------+
|  0|at this point i r...| positive|
|  1|billion twitterta...|  Neutral|
|  2|https t co ssxwea...|  Neutral|
|  3|hello project ver...| positive|
|  4|purging of conser...|  Neutral|
+---+--------------------+---------+
only showing top 5 rows



Encoding the Target colum.

In [18]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Sentiment", outputCol="Sentiment_num").fit(dataframe_mysql)
indexed_df = indexer.transform(dataframe_mysql)
indexed_df.show(5)

+---+--------------------+---------+-------------+
| id|               Tweet|Sentiment|Sentiment_num|
+---+--------------------+---------+-------------+
|  0|at this point i r...| positive|          1.0|
|  1|billion twitterta...|  Neutral|          0.0|
|  2|https t co ssxwea...|  Neutral|          0.0|
|  3|hello project ver...| positive|          1.0|
|  4|purging of conser...|  Neutral|          0.0|
+---+--------------------+---------+-------------+
only showing top 5 rows



After encoding the Target column, the data is ready to be used for modeling.

In [21]:
indexed_df.drop('Sentiment').show(5)

+---+--------------------+-------------+
| id|               Tweet|Sentiment_num|
+---+--------------------+-------------+
|  0|at this point i r...|          1.0|
|  1|billion twitterta...|          0.0|
|  2|https t co ssxwea...|          0.0|
|  3|hello project ver...|          1.0|
|  4|purging of conser...|          0.0|
+---+--------------------+-------------+
only showing top 5 rows



Tokenizer.

In [33]:

tokenizer = Tokenizer(inputCol=indexed_df.columns[1], outputCol="Tokens")
wordsData = tokenizer.transform(indexed_df)

In [34]:
wordsData.show(5)

+---+--------------------+---------+-------------+--------------------+
| id|               Tweet|Sentiment|Sentiment_num|              Tokens|
+---+--------------------+---------+-------------+--------------------+
|  0|at this point i r...| positive|          1.0|[at, this, point,...|
|  1|billion twitterta...|  Neutral|          0.0|[billion, twitter...|
|  2|https t co ssxwea...|  Neutral|          0.0|[https, t, co, ss...|
|  3|hello project ver...| positive|          1.0|[hello, project, ...|
|  4|purging of conser...|  Neutral|          0.0|[purging, of, con...|
+---+--------------------+---------+-------------+--------------------+
only showing top 5 rows



Converting words to vector.

In [35]:
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="Tokens", outputCol="result")
model = word2Vec.fit(wordsData)

result = model.transform(wordsData)

In [36]:
result.show(5)

+---+--------------------+---------+-------------+--------------------+--------------------+
| id|               Tweet|Sentiment|Sentiment_num|              Tokens|              result|
+---+--------------------+---------+-------------+--------------------+--------------------+
|  0|at this point i r...| positive|          1.0|[at, this, point,...|[-0.0221444366782...|
|  1|billion twitterta...|  Neutral|          0.0|[billion, twitter...|[-0.1028491018268...|
|  2|https t co ssxwea...|  Neutral|          0.0|[https, t, co, ss...|[-0.1155088072021...|
|  3|hello project ver...| positive|          1.0|[hello, project, ...|[-0.0734242481800...|
|  4|purging of conser...|  Neutral|          0.0|[purging, of, con...|[-0.0491263920441...|
+---+--------------------+---------+-------------+--------------------+--------------------+
only showing top 5 rows



Train and test data split.

In [40]:
train_df,test_df = result.select(result.columns[3], 'result').randomSplit([0.8,0.2], seed=140)

Naive Bayes model.

In [42]:
NB = NaiveBayes(featuresCol = 'result', labelCol = result.columns[3],  modelType="gaussian")
NB_model = NB.fit(train_df)
predictions = NB_model.transform(test_df)

Prediction on test data.

In [49]:
predictions.filter(predictions['prediction'] == 1).select('prediction').count()/test_df.count()

0.677891654465593

In [50]:
predictions.filter(predictions['prediction'] == 0).select('prediction').count()/test_df.count()

0.32210834553440704

In [51]:
predictions.filter(predictions['prediction'] == 2).select('prediction').count()/test_df.count()

0.0

Model Performance.

In [100]:
result = predictions.select(predictions.columns[3], 'prediction').toPandas()
true_labels=(test_df.select("Sentiment_num")).toPandas()
predicted_labels=result["prediction"]

print("-- Naive Bayes Classifier --")
print("------------------------------------------------------------------------")
print("Classification Report\n",classification_report(true_labels, predicted_labels))
print("------------------------------------------------------------------------")
print("Confusion matrix\n",confusion_matrix(true_labels,predicted_labels),"\n\n")
DC=confusion_matrix(true_labels,predicted_labels)

-- Naive Bayes Classifier --
------------------------------------------------------------------------
Classification Report
               precision    recall  f1-score   support

         0.0       0.52      0.40      0.45       284
         1.0       0.43      0.77      0.55       260
         2.0       0.00      0.00      0.00       139

    accuracy                           0.46       683
   macro avg       0.32      0.39      0.33       683
weighted avg       0.38      0.46      0.40       683

------------------------------------------------------------------------
Confusion matrix
 [[114 170   0]
 [ 61 199   0]
 [ 45  94   0]] 




  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Random Forest Classifier.

In [63]:
rfc = RandomForestClassifier(featuresCol = 'result', labelCol = 'Sentiment_num')
rfc_model = rfc.fit(train_df)
predictionsrfc = rfc_model.transform(test_df)

In [60]:
train_df

DataFrame[Sentiment_num: double, result: vector]

In [64]:
predictionsrfc.filter(predictionsrfc['prediction'] == 1).select('prediction').count()/test_df.count()

0.486090775988287

In [65]:
predictionsrfc.filter(predictionsrfc['prediction'] == 2).select('prediction').count()/test_df.count()

0.013177159590043924

In [66]:
predictionsrfc.filter(predictionsrfc['prediction'] == 0).select('prediction').count()/test_df.count()

0.5007320644216691

Model performance.

In [76]:
result = predictionsrfc.select(predictionsrfc.columns[3], 'prediction').toPandas()
true_labels=(test_df.select("Sentiment_num")).toPandas()
predicted_labels=result["prediction"]

print("-- Random Forest Classifier --")
print("------------------------------------------------------------------------")
print("Classification Report\n",classification_report(true_labels, predicted_labels))
print("------------------------------------------------------------------------")
print("Confusion matrix\n",confusion_matrix(true_labels,predicted_labels),"\n\n")
DC=confusion_matrix(true_labels,predicted_labels)

-- Random Forest Classifier --
------------------------------------------------------------------------
Classification Report
               precision    recall  f1-score   support

         0.0       0.53      0.63      0.58       284
         1.0       0.48      0.62      0.54       260
         2.0       0.56      0.04      0.07       139

    accuracy                           0.51       683
   macro avg       0.52      0.43      0.40       683
weighted avg       0.52      0.51      0.46       683

------------------------------------------------------------------------
Confusion matrix
 [[180 101   3]
 [ 98 161   1]
 [ 64  70   5]] 




In [80]:
from pandas_profiling import ProfileReport
profile = ProfileReport(dataframe_mysql.toPandas())
profile.to_file("output.html")

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]