In [1]:
# !which pip
# !pwd

'which' is not recognized as an internal or external command,
operable program or batch file.
'pwd' is not recognized as an internal or external command,
operable program or batch file.


In [3]:
import os
import atexit
import sys
import time
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=1
tasks_per_node=4
memory_per_task=4096 #4 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hours
#os.environ['SBATCH_PARTITION']='cpu2019' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
time.sleep(60)
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)


ModuleNotFoundError: No module named 'pyspark'

In [3]:
sqlContext = sqlCtx
spark = sqlCtx

In [4]:
#!pip install --upgrade pip
!pip install nltk
import nltk
nltk.download('punkt')
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf

[33mYou are using pip version 10.0.1, however version 21.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
[nltk_data] Downloading package punkt to
[nltk_data]     /home/drew.burritt/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [5]:
def tokenize1(text):
    words = nltk.word_tokenize(text)
    return words  
tokenize_word = udf(lambda x: tokenize1(x)  , ArrayType(StringType()))

In [6]:
def tokenize2(text):
    sents = nltk.sent_tokenize(text)
    return sents  
tokenize_sent = udf(lambda x: tokenize2(x)  , ArrayType(StringType()))

In [7]:
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('punkt')
stop_en = stopwords.words('english')
def remove_stopwords1(word_list):
    filtered_words = [word for word in word_list if word not in stop_en]
    return filtered_words
remove_stopwords = udf(lambda x: remove_stopwords1(x) , ArrayType(StringType()))

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/drew.burritt/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /home/drew.burritt/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [8]:
def remove_noise1(word_list):
    filtered_words = [word for word in word_list if word.isalnum() and len(word)>2]
    return filtered_words
remove_noise = udf(lambda x: remove_noise1(x) , ArrayType(StringType()))

In [9]:
from nltk.stem import SnowballStemmer
def stem1(word_list):
    snowball = SnowballStemmer(language='english')
    stemmed_words = [snowball.stem(word) for word in word_list]
    return stemmed_words
stem = udf(lambda x: stem1(x) , ArrayType(StringType()))

In [10]:
from nltk.sentiment import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')
def sentiment1(text):
  sia = SentimentIntensityAnalyzer()
  return sia.polarity_scores(text)['compound']
sentiment = udf(lambda x: sentiment1(x) , FloatType())

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/drew.burritt/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!




In [22]:
from pyspark.sql.functions import lower
def process_data(df):
  dfText = df.select("Index","Review" ,"polarity","real_fake", tokenize_word("Review").alias("tokenized_words"))
  dfText = dfText.withColumn("sentiment" ,sentiment("Review"))
  dfText = dfText.withColumn("tokenized_sents" ,tokenize_sent("Review"))
  dfText = dfText.withColumn("no_stopwords", remove_stopwords("tokenized_words"))
  dfText = dfText.withColumn("no_noise", remove_noise("no_stopwords"))
  dfText = dfText.withColumn("stemmed", stem("no_noise"))
  #dfText = dfText.select('*', F.concat_ws("_","real_fake","polarity").alias("target"))
  return dfText

In [23]:
df_raw_test = spark.read.option("escape","\"").option("header",True).csv("Hotel_Reviews_Calgary.csv")
df_test_all = process_data(df_raw_test)
#df_test = df_test_all.select("Review","stemmed","sentiment","target","real_fake")
df_test = df_test_all.select("Review","stemmed","sentiment","polarity","real_fake")

df_raw_train = spark.read.option("escape","\"").option("header",True).csv("Original_data.csv")
df_train = process_data(df_raw_train)
#df_train = df_train.select("Review","stemmed","sentiment","target","real_fake")
df_train = df_train.select("Review","stemmed","sentiment","polarity","real_fake")

#df_train.display()
combined = df_train.union(df_test)

In [24]:
print((df_test.count(), len(df_test.columns)))

(1000, 5)


In [25]:
df_train.cache()
df_test.cache()

DataFrame[Review: string, stemmed: array<string>, sentiment: float, polarity: string, real_fake: string]

In [26]:
df_train.show(1)

+--------------------+--------------------+---------+--------+---------+
|              Review|             stemmed|sentiment|polarity|real_fake|
+--------------------+--------------------+---------+--------+---------+
|We stayed 2 night...|[stay, night, spr...|  -0.9593|negative|     real|
+--------------------+--------------------+---------+--------+---------+
only showing top 1 row



In [27]:
from pyspark.ml.feature import HashingTF, IDF, IndexToString, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_split, test_split = df_train.randomSplit(weights = [0.80, 0.20], seed = 1)
train_split.cache()

hashingTF = HashingTF(inputCol="stemmed", outputCol="rawFeatures", numFeatures=20000)

idf = IDF(inputCol="rawFeatures", outputCol="TF_IDF", minDocFreq=2)
label_strIdx1 = StringIndexer(inputCol="polarity", outputCol="polarity_idx")
assembles = VectorAssembler(inputCols = ['TF_IDF','sentiment','polarity_idx'],outputCol="features")
label_strIdx2 = StringIndexer(inputCol="real_fake", outputCol="label")
lr = LogisticRegression(regParam = 0.3)
label_idxStr = IndexToString(inputCol = "label", outputCol = "article_class")

pipeline = Pipeline(stages=[hashingTF, idf,label_strIdx1, assembles, label_strIdx2, lr ,label_idxStr])


In [28]:
model = pipeline.fit(train_split)

In [None]:
pred = model.transform(test_split)
pred_original = model.transform(df_test)
pred.cache()
pred_original.cache()

In [113]:
pred.select("review","polarity", "label", "prediction","article_class").show()

+--------------------+--------+-----+----------+-------------+
|              review|polarity|label|prediction|article_class|
+--------------------+--------+-----+----------+-------------+
| My wife and I's ...|positive|  0.0|       0.0|         fake|
|A bunch of us got...|positive|  1.0|       0.0|         real|
|A lovely hotel in...|negative|  1.0|       1.0|         real|
|A recent stay at ...|negative|  0.0|       0.0|         fake|
|Affinia hotel in ...|negative|  0.0|       0.0|         fake|
|After considering...|negative|  0.0|       0.0|         fake|
|After reading so ...|positive|  1.0|       1.0|         real|
|After reading the...|negative|  1.0|       1.0|         real|
|After reading the...|positive|  1.0|       1.0|         real|
|After some delibe...|positive|  1.0|       1.0|         real|
|All I can say is ...|negative|  1.0|       0.0|         real|
|Amalifa Hotel in ...|positive|  0.0|       0.0|         fake|
|Awesome hotel, ou...|positive|  1.0|       1.0|       

In [19]:
pred.show()

NameError: name 'pred' is not defined

In [122]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
eval = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
#acc_our_data = eval.evaluate(pred)
print("our data: ", acc_our_data)
acc_original_data = eval.evaluate(pred_original)
print("original data: ", acc_original_data)


our data:  0.8651026392961877
original data:  0.608


In [115]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10000,20000,50000]) \
    .addGrid(lr.regParam, [0.1, 0.3 ,0.5]) \
    .build()
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,evaluator= MulticlassClassificationEvaluator(),numFolds=4,parallelism = 100 )  # use 3+ folds in practice

In [20]:
cvModel = crossval.fit(train_split)

In [21]:
p = cvModel.transform(test_split)

In [22]:
acc = eval.evaluate(p)

In [23]:
acc

0.7476979742173112

In [24]:
params = [{
      p.name: v
      for p,
      v in m.items()
   }
   for m in cvModel.getEstimatorParamMaps()
]
import pandas as pd

pd.DataFrame.from_dict([{
      cvModel.getEvaluator().getMetricName(): metric,
      ** ps
   }
   for ps, metric in zip(params, cvModel.avgMetrics)
])

Unnamed: 0,f1,numFeatures,regParam
0,0.698392,10000,0.1
1,0.70372,10000,0.3
2,0.704871,10000,0.5
3,0.711836,20000,0.1
4,0.71705,20000,0.3
5,0.710479,20000,0.5
6,0.707334,50000,0.1
7,0.710366,50000,0.3
8,0.709178,50000,0.5


In [17]:
from pyspark.ml.feature import HashingTF, IDF, IndexToString, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_split, test_split = df_train.randomSplit(weights = [0.80, 0.20], seed = 1)
train_split.cache()

hashingTF = HashingTF(inputCol="stemmed", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="TF_IDF", minDocFreq=2)
label_strIdx1 = StringIndexer(inputCol="polarity", outputCol="polarity_idx")
assembles = VectorAssembler(inputCols = ['TF_IDF','sentiment','polarity_idx'],outputCol="features")
label_strIdx2 = StringIndexer(inputCol="real_fake", outputCol="label")
svc = LinearSVC(featuresCol="features",labelCol="label")
label_idxStr = IndexToString(inputCol = "label", outputCol = "article_class")

pipeline_svc = Pipeline(stages=[hashingTF, idf,label_strIdx1 , assembles, label_strIdx2, svc ,label_idxStr])

In [18]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10000,20000,50000]) \
    .addGrid(svc.regParam, [0.0001, 0.001, 0.01, 0.1, 1]) \
    .build()
crossval_svc = CrossValidator(estimator=pipeline_svc, estimatorParamMaps=paramGrid,evaluator= MulticlassClassificationEvaluator(),numFolds=2,parallelism = 100 )  # use 3+ folds in practice

In [160]:
cvModel_svc = crossval_svc.fit(train_split)

In [155]:
pred_svc = cvModel_svc.transform(test_split)
pred_original_svc = cvModel_svc.transform(df_test)
pred_svc.cache()
pred_original_svc.cache()

DataFrame[Review: string, stemmed: array<string>, sentiment: float, polarity: string, real_fake: string, rawFeatures: vector, TF_IDF: vector, polarity_idx: double, features: vector, label: double, rawPrediction: vector, prediction: double, article_class: string]

In [156]:
eval = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction",metricName="accuracy")
acc_our_data_svc = eval.evaluate(pred_svc)
print("our data: ", acc_our_data_svc)
acc_original_data_svc = eval.evaluate(pred_original_svc)
print("original data: ", acc_original_data_svc)

our data:  0.8621700879765396
original data:  0.625


In [157]:
params = [{
      p.name: v
      for p,
      v in m.items()
   }
   for m in cvModel_svc.getEstimatorParamMaps()
]
import pandas as pd

pd.DataFrame.from_dict([{
      cvModel.getEvaluator().getMetricName(): metric,
      ** ps
   }
   for ps, metric in zip(params, cvModel.avgMetrics)
])

Unnamed: 0,f1,numFeatures,regParam
0,0.81617,10000,0.0001
1,0.813046,10000,0.001
2,0.81304,10000,0.01


In [85]:
svc.params

[Param(parent='LinearSVC_293ba5c073bb', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearSVC_293ba5c073bb', name='featuresCol', doc='features column name.'),
 Param(parent='LinearSVC_293ba5c073bb', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearSVC_293ba5c073bb', name='labelCol', doc='label column name.'),
 Param(parent='LinearSVC_293ba5c073bb', name='maxIter', doc='max number of iterations (>= 0).'),
 Param(parent='LinearSVC_293ba5c073bb', name='predictionCol', doc='prediction column name.'),
 Param(parent='LinearSVC_293ba5c073bb', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'),
 Param(parent='LinearSVC_293ba5c073bb', name='regParam', doc='regularization parameter (>= 0).'),
 Param(parent='LinearSVC_293ba5c073bb', name='standardization', doc='whether to standardize the training features before fitting the model.'),
 Param(parent='LinearSVC_293ba5c073bb', name='threshold