In [151]:
# some imports
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer, CountVectorizerModel 
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F
from pyspark.sql.functions import explode
from pyspark.sql.functions import regexp_replace
import numpy as np
from pyspark.sql.types import FloatType, StringType
import math

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# create spark context
sc = SparkContext()
#create spark session
spark = SparkSession(sc)
# save datapath in variable
dataPath = "hdfs:///user/elmar/amazon-reviews/reviewscombined-small.json"

In [153]:
##########################################################################################################################################
####### PART 1                                                             ##############################################################
##########################################################################################################################################
# RDD specific imports
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.feature import StandardScaler, StandardScalerModel, Normalizer
from pyspark.sql import SQLContext, Row

import json
# create rdd
documents = sc.textFile(dataPath)
# load as json
dataset = documents.map(json.loads)
# select only reviewteext
reviews = dataset.map(lambda e: e['reviewText'] )
# define stopwords
stopwords = ["i", "a", "a's","able","about","above","according","accordingly","across","actually","after","afterwards","again","against","ain't","all","allow","allows","almost","alone","along","already","also","although","always","am","among","amongst","an","and","another","any","anybody","anyhow","anyone","anything","anyway","anyways","anywhere","apart","appear","appreciate","appropriate","are","aren't","around","as","aside","ask","asking","associated","at","available","away","awfully","be","became","because","become","becomes","becoming","been","before","beforehand","behind","being","believe","below","beside","besides","best","better","between","beyond","both","brief","but","by","c'mon","c's","came","can","can't","cannot","cant","cause","causes","certain","certainly","changes","clearly","co","com","come","comes","concerning","consequently","consider","considering","contain","containing","contains","corresponding","could","couldn't","course","currently","definitely","described","despite","did","didn't","different","do","does","doesn't","doing","don't","done","down","downwards","during","each","edu","eg","eight","either","else","elsewhere","enough","entirely","especially","et","etc","even","ever","every","everybody","everyone","everything","everywhere","ex","exactly","example","except","far","few","fifth","first","five","followed","following","follows","for","former","formerly","forth","four","from","further","furthermore","get","gets","getting","given","gives","go","goes","going","gone","got","gotten","greetings","had","hadn't","happens","hardly","has","hasn't","have","haven't","having","he","he's","hello","help","hence","her","here","here's","hereafter","hereby","herein","hereupon","hers","herself","hi","him","himself","his","hither","hopefully","how","howbeit","however","i'd","i'll","i'm","i've","ie","if","ignored","immediate","in","inasmuch","inc","indeed","indicate","indicated","indicates","inner","insofar","instead","into","inward","is","isn't","it","it'd","it'll","it's","its","itself","just","keep","keeps","kept","know","knows","known","last","lately","later","latter","latterly","least","less","lest","let","let's","like","liked","likely","little","look","looking","looks","ltd","mainly","many","may","maybe","me","mean","meanwhile","merely","might","more","moreover","most","mostly","much","must","my","myself","name","namely","nd","near","nearly","necessary","need","needs","neither","never","nevertheless","new","next","nine","no","nobody","non","none","noone","nor","normally","not","nothing","novel","now","nowhere","obviously","of","off","often","oh","ok","okay","old","on","once","one","ones","only","onto","or","other","others","otherwise","ought","our","ours","ourselves","out","outside","over","overall","own","particular","particularly","per","perhaps","placed","please","plus","possible","presumably","probably","provides","que","quite","qv","rather","rd","re","really","reasonably","regarding","regardless","regards","relatively","respectively","right","said","same","saw","say","saying","says","second","secondly","see","seeing","seem","seemed","seeming","seems","seen","self","selves","sensible","sent","serious","seriously","seven","several","shall","she","should","shouldn't","since","six","so","some","somebody","somehow","someone","something","sometime","sometimes","somewhat","somewhere","soon","sorry","specified","specify","specifying","still","sub","such","sup","sure","t's","take","taken","tell","tends","th","than","thank","thanks","thanx","that","that's","thats","the","their","theirs","them","themselves","then","thence","there","there's","thereafter","thereby","therefore","therein","theres","thereupon","these","they","they'd","they'll","they're","they've","think","third","this","thorough","thoroughly","those","though","three","through","throughout","thru","thus","to","together","too","took","toward","towards","tried","tries","truly","try","trying","twice","two","un","under","unfortunately","unless","unlikely","until","unto","up","upon","us","use","used","useful","uses","using","usually","value","various","very","via","viz","vs","want","wants","was","wasn't","way","we","we'd","we'll","we're","we've","welcome","well","went","were","weren't","what","what's","whatever","when","whence","whenever","where","where's","whereafter","whereas","whereby","wherein","whereupon","wherever","whether","which","while","whither","who","who's","whoever","whole","whom","whose","why","will","willing","wish","with","within","without","won't","wonder","would","wouldn't","yes","yet","you","you'd","you'll","you're","you've","your","yours","yourself","yourselves","zero"]
# define function to lowercase and remove special chars
def lower(lines):
    punc='!"#$%&\'()*+,-./:;1234567890<=>?@[\\]^_`{|}~'
    for ch in punc:
        lines = lines.replace(ch, '')
    lines = lines.lower()
    lines = lines.split()
    return lines

# lowercase and remove special chars
tokens = reviews.map(lower)
#remove stopwords
tokens = tokens.filter(lambda x : x not in stopwords)
#define hashing 
hashingTF = HashingTF()
# get term frequency
tf = hashingTF.transform(tokens)
# define idf
idf = IDF().fit(tf)
# calcualte tfidf
tfidf = idf.transform(tf)
#return vlaues for review 1
tfidf.take(1)

[SparseVector(1048576, {15069: 1.9497, 28433: 8.0562, 34116: 4.5124, 59645: 3.7441, 70882: 1.6007, 140853: 8.2794, 151357: 1.6096, 154253: 0.7359, 162236: 4.1442, 163495: 4.5189, 178046: 2.3898, 184663: 4.7604, 197139: 1.8032, 200484: 4.4134, 214848: 3.9753, 238153: 0.7052, 268040: 2.3346, 273004: 5.5225, 276491: 0.6626, 291135: 2.5324, 298766: 8.9725, 302147: 1.9417, 348943: 0.9745, 357784: 0.5969, 407524: 4.071, 431618: 4.85, 438276: 1.0532, 452367: 1.6265, 458110: 3.763, 461228: 5.403, 463522: 0.4456, 464125: 8.0562, 465091: 3.1995, 472985: 1.7347, 498665: 7.5862, 511771: 2.8014, 540404: 2.9936, 551905: 5.3752, 569592: 5.5548, 648331: 0.9585, 675737: 8.9678, 676489: 0.9381, 685921: 8.5671, 702250: 1.6272, 706364: 1.0889, 721336: 1.7269, 732481: 6.5746, 745992: 5.0807, 749264: 2.1061, 778868: 3.7964, 782550: 5.6952, 794488: 1.2825, 800830: 4.9383, 868014: 2.5599, 897662: 4.1726, 898785: 5.7144, 924138: 5.3219, 929834: 1.2924, 935701: 1.0537, 941745: 8.0562, 992550: 1.9337, 1003235: 4

In [149]:
##########################################################################################################################################
####### PART 2                                                              ##############################################################
##########################################################################################################################################

from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, CountVectorizer, ChiSqSelector, StringIndexer, Normalizer
from pyspark.ml import Pipeline

df1 = spark.read.json(dataPath).select("reviewText", "category")
# tokenize and remove chars with lenght 1
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="tokenizedReview", pattern="\\W+").setMinTokenLength(2)
#tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenizedReview")
# remove stopwords
remover = StopWordsRemover(inputCol="tokenizedReview", outputCol="tokens")
# calculate term frequency as vector(word_id:tf)
cv = CountVectorizer(inputCol="tokens", outputCol="tf", minDF=2.0)
# weight down tf by idf
idf = IDF(inputCol="tf", outputCol="tfidf")
# convert category to float and name it label
indexer = StringIndexer(inputCol="category", outputCol="label")
# calculate top 4000 features based on chisquared values and categories
selector = ChiSqSelector(numTopFeatures=4000, featuresCol="tfidf",
                         outputCol="selectedFeatures", labelCol="label")
# normlaize results with l2 norm
normalizer = Normalizer(inputCol="tfidf", outputCol="features", p=2.0)
# build pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, cv, idf , indexer, selector, normalizer ])
# run df1 through pipeline
top_k_features = pipeline.fit(df1)
#create final dataframe containign tfidf and chisq top features normalized => input for ML algo
fin = top_k_features.transform(df1)

In [150]:
# get top 4000 terms to the selectedFeatures indeces based on the small dataset
# run manually through pipeline save models to values
#df1 = spark.read.json(dataPath).select("reviewText", "category")
#regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="tokenizedReview", pattern="\\W+").setMinTokenLength(3)
#df1 = regexTokenizer.transform(df1)
#df1 = remover.transform(df1)
# save cv model to trasnform indices to words
cvModel = cv.fit(df1)
#df1 = cvModel.transform(df1)
#df1 = idf.fit(df1).transform(df1)
#df1 = indexer.fit(df1).transform(df1)
# save chisq model to transformm incides to words
chiSqModel = selector.fit(df1)
# transform indices to words
terms = [cvModel.vocabulary[i] for i in chiSqModel.selectedFeatures]

# write out to file 
with open('output_ds.txt', 'w') as f:
    for item in sorted(terms):
        f.write("%s\n" % item)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
##########################################################################################################################################
####### PART 3                                                            ################################################################
##########################################################################################################################################
from pyspark.ml.classification import LinearSVC, OneVsRest, LinearSVCModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, CountVectorizer, ChiSqSelector, StringIndexer, Normalizer
from pyspark.ml import Pipeline

# set a seed for reproducability 
rnd = 30

#read data
df1 = spark.read.json(dataPath).select("reviewText", "category")

#split into train, test and validation data
(train, validate, test) = df1.randomSplit([0.64, 0.16, 0.2], seed=rnd)        # train data was split 80-20 for train and validation


# tokenize and remove chars with lenght 1
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="tokenizedReview", pattern="\\W+").setMinTokenLength(3)
#tokenizer = Tokenizer(inputCol="reviewText", outputCol="tokenizedReview")
# remove stopwords
remover = StopWordsRemover(inputCol="tokenizedReview", outputCol="tokens")
# calculate term frequency as vector(word_id:tf)
cv = CountVectorizer(inputCol="tokens", outputCol="tf", minDF=2.0)
# weight down tf by idf
idf = IDF(inputCol="tf", outputCol="tfidf")
# convert category to float and name it label
indexer = StringIndexer(inputCol="category", outputCol="label")
# calculate top 4000 features based on chisquared values and categories
selector = ChiSqSelector(numTopFeatures=4000, featuresCol="tfidf",
                         outputCol="selectedFeatures", labelCol="label")
# normlaize results with l2 norm
normalizer = Normalizer(inputCol="tfidf", outputCol="features", p=2.0)

# extend the pipeline from 2
#set the classifier to support vector machine
classifier = LinearSVC()
# use onevsrest classifier in order to handle multi class binary 
ml_classifier = OneVsRest(classifier=classifier)

#extend and build pipeline
pipeline = Pipeline(stages=[regexTokenizer, remover, cv, idf , indexer, selector, normalizer , ml_classifier ])

# run data  through pipeline
# create grid with svm parameters
paramGrid = ParamGridBuilder() \
    .addGrid(classifier.maxIter, [1, 50, 100]) \
    .addGrid(classifier.regParam, [0.001, 0.01, 0.1]) \
    .addGrid(classifier.threshold, [0.00, 0.05, 0.1]) \
    .addGrid(classifier.aggregationDepth, [2, 5, 10]) \
    .addGrid(classifier.tol, [1e-6, 1e-4, 1e-5]) \
    .build()
# create crossvalidation model with pipeline data, grid data and multiclassevaluator, with 6 folds evaluating 3 parallely
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=MulticlassClassificationEvaluator(metricName="f1"),
    numFolds=6,
    parallelism=3,
    seed=rnd)

cvModel = crossval.fit(train)

#cvModel.save("home/dic/2019S/users/e01551118g/cvModelv4")

In [4]:
#from pyspark.ml.tuning import CrossValidatorModel
#cvModel = CrossValidatorModel.load("home/dic/2019S/users/e01551118g/cvModelv4")

# create prediction for validation data
validation = cvModel.transform(validate)
# create prediction for test data
prediction = cvModel.transform(test)

Py4JJavaError: An error occurred while calling o62.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://c100.local:8020/user/e01551118g/home/dic/2019S/users/e01551118g/cvModelv4/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1337)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
	at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1372)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1371)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:387)
	at org.apache.spark.ml.tuning.ValidatorParams$.loadImpl(ValidatorParams.scala:180)
	at org.apache.spark.ml.tuning.CrossValidatorModel$CrossValidatorModelReader.load(CrossValidator.scala:388)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [49]:
# get correct predictions for validation data
correct_validation = validation.filter(validation.label == validation.prediction).count()
# get number of wrong predictions for validation data
wrong_validation = validation.filter(validation.label != validation.prediction).count()
# get correct predictions for test data
correct_test = prediction.filter(prediction.label == prediction.prediction).count()
# get number of wrong predictions for test data
wrong_test = prediction.filter(prediction.label != prediction.prediction).count()                    

In [57]:
# Tests for validation data
# correct predictions 2118
# wrong predictions 402
# total number of predictions 2520 
# Accuracy 84.047619

In [63]:
# Tests for test data
# correct predictions 2653
# wrong predictions 420
# total number of predictions 3073 
# Accuracy 86.33257