In [31]:
# Preload packages
from pyspark.sql import SparkSession
from pyspark.ml import feature, regression, evaluation, Pipeline
from pyspark.sql import functions as fn, Row
import matplotlib.pyplot as plt
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# dataframe functions
from pyspark.sql import functions as fn
import os
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
stop_words[0:10]
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import IDF
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
from pyspark.sql.types import *

from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [6]:
sentiments_df=spark.read.parquet('sentiments.parquet')

In [4]:
###import dataset

fullDFP=pd.read_csv('fulldatasetT.csv',dtype={'polarity': float})
fullDFP=fullDFP.loc[:,['id','text','place','polarity']]
fullDFP['score']=0
fullDFP.loc[fullDFP["polarity"] > 0.0, 'score'] = 1

mySchema = StructType([StructField("id", StringType(), True)\

                       ,StructField("text", StringType(), True)\

                       ,StructField("place", StringType(), True)\

                       ,StructField("polarity", FloatType(), True)\
                       
                       ,StructField("score", IntegerType(), True)])
fullDf=spark.createDataFrame(fullDFP,schema=mySchema)

In [None]:
fullDf=fullDf.withColumnRenamed('score','label')

In [12]:
tokenizer = RegexTokenizer().setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("text")\
  .setOutputCol("words")
review_words_df = tokenizer.transform(fullDf)
tweet_words_sentiment_df = review_words_df.\
    select('id', fn.explode('words').alias('word')).\
    join(sentiments_df, 'word')
tweet_words_sentiment_df.show(5)

sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

# we will remove words that appear in 5 docs or less
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2**17)\
  .setInputCol("filtered")\
  .setOutputCol("tf")


# we now create a pipelined transformer
cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(fullDf)




idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')
lr = LogisticRegression().\
    setLabelCol('label').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(fullDf)

+----------+--------+---------+
|      word|      id|sentiment|
+----------+--------+---------+
|confidence|1.29E+18|        1|
|    strong|1.29E+18|        1|
|      nice|1.29E+18|        1|
|      like|1.29E+18|        1|
|protection|1.29E+18|        1|
+----------+--------+---------+
only showing top 5 rows



In [None]:

training_df, validation_df, testing_df = fullDf.randomSplit([0.6, 0.3, 0.1])

In [15]:
rf = RandomForestClassifier().setLabelCol('label').\
    setFeaturesCol('tfidf')
rf_pipeline = Pipeline(stages=[idf_pipeline, rf]).fit(training_df)

In [19]:
fullDf.show()

+--------+--------------------+--------------------+--------+-----+
|      id|                text|               place|polarity|label|
+--------+--------------------+--------------------+--------+-----+
|1.29E+18|Yoga Instructor🧘...|       Kentucky, USA|  0.1767|    1|
|1.29E+18|Coz a nice stroll...|Sydney, New South...|  0.3333|    1|
|1.29E+18|NEW! #Facemasks i...|          Martinique|  0.1705|    1|
|1.29E+18|I’m not saying I’...|           Boise, ID| -0.1356|    0|
|1.29E+18|Fighting Stigma: ...|   Mascouche, Québec|     0.0|    0|
|1.29E+18|Fighting Stigma: ...|   Mascouche, Québec| -0.3125|    0|
|1.29E+18|Fighting Stigma: ...|   Mascouche, Québec|  0.1238|    1|
|1.29E+18|Them COVID nights...|Vandenberg Villag...|     0.0|    0|
|1.29E+18|Shoot after post ...|Fort Tondiarpet, ...|     0.0|    0|
|1.29E+18|BON APPETITE

Eva...|     Bal Harbour, FL|  0.0111|    1|
|1.29E+18|Been trying to ma...|       Southport, IN| -0.0429|    0|
|1.29E+18|Been trying to ma...|       Southport, 

In [34]:
bce = BinaryClassificationEvaluator()
bce.evaluate(rf_pipeline.transform(validation_df))

0.7199926431076215

In [20]:
rf_model = rf_pipeline.stages[-1]
predictions = rfModel.transform(testData)
#pd.DataFrame(list(zip(fullDf.text, rf_model.featureImportances.toArray())),
            #columns = ['column', 'weight']).sort_values('weight')
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

TypeError: Column is not iterable

In [22]:
len(rf_model.trees)

20

In [23]:
print(rf_model.trees[0].toDebugString)

DecisionTreeClassificationModel: uid=dtc_61201871a0e8, depth=5, numNodes=29, numClasses=2, numFeatures=31663
  If (feature 497 <= 2.7026565541855776)
   If (feature 2 <= 0.3931518690236652)
    If (feature 2655 <= 3.511800318041119)
     If (feature 210 <= 2.362734946718786)
      If (feature 455 <= 2.659089684901454)
       Predict: 1.0
      Else (feature 455 > 2.659089684901454)
       Predict: 0.0
     Else (feature 210 > 2.362734946718786)
      Predict: 1.0
    Else (feature 2655 > 3.511800318041119)
     Predict: 1.0
   Else (feature 2 > 0.3931518690236652)
    If (feature 9 <= 1.347995539054795)
     If (feature 3813 <= 3.7035434187358134)
      If (feature 6511 <= 4.061106462375173)
       Predict: 1.0
      Else (feature 6511 > 4.061106462375173)
       Predict: 0.0
     Else (feature 3813 > 3.7035434187358134)
      Predict: 0.0
    Else (feature 9 > 1.347995539054795)
     If (feature 2027 <= 3.3679592818152284)
      If (feature 24825 <= 4.991482632732677)
       Predict: 

In [29]:
predictions = rf_pipeline.transform(testing_df)
predictions.filter(predictions['prediction'] == 0) \
    .select("text",'probability',"label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+-----+----------+
|                          text|                   probability|label|prediction|
+------------------------------+------------------------------+-----+----------+
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe...|[0.6022037189350764,0.39779...|    0|       0.0|
|I challenge you to #SaveThe

In [None]:
#########################NAIVE BAYES