In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml import Pipeline, PipelineModel


In [3]:
review_data = spark.read.json("review.json")

In [122]:
review_data.show(1)

+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|      date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|uYHaNptLzDLoV_JZ_...|   0|2016-07-12|    0|VfBHSwC5Vz_pbFluy...|    5|My girlfriend and...|     0|cjpdDjZyprfyDG3Rl...|
+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 1 row



### check distribution of the rating star

In [3]:
print review_data.groupBy(review_data["stars"]).count().show()

+-----+-------+
|stars|  count|
+-----+-------+
|    5|1988003|
|    1| 639849|
|    3| 570819|
|    2| 402396|
|    4|1135830|
+-----+-------+

None


Exclude neutral review

In [4]:
def pos_neg(star):
    if star <3:
        return 0 #negative
    elif star >3 :
        return 1 #positive
    else:
        return 2 #neutral
    
star_to_senti = udf(lambda x:pos_neg(x))
train_test_DF = review_data.select('text',star_to_senti('stars').alias('label')).filter("label != 2") #exclude neutral reviews

In [6]:
print train_test_DF.groupBy(train_test_DF["label"]).count().show()

+-----+-------+
|label|  count|
+-----+-------+
|    0|1042245|
|    1|3123833|
+-----+-------+

None


In [6]:
train_test_DF.show(1)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|My girlfriend and...|    1|
+--------------------+-----+
only showing top 1 row



### Create TFIDF features

In [5]:
#remove punctuation
import re
import string

def remove_num_punct(text):

    my_string = text.replace("-", " ")
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", my_string)  # delete stuff but leave at least a space to avoid clumping together

    nopunct = nopunct.split()
    #nopunct = [stemmer.stem(w).strip(" ") for w in nopunct] #remove stop word and normalize word using stemmer.
    nopunct = [w.strip() for w in nopunct]
    nopunct = ' '.join(nopunct)
    
    return nopunct

udf_num_punct = udf(lambda x:remove_num_punct(x))
review_rmsw = train_test_DF.select(udf_num_punct('text').alias('text'), 'label')
review_rmsw.show(1,truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
### setNumFeatures(20)
n_features = 20

### Unigram tfidf

In [7]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
hashingTF = HashingTF().setNumFeatures(n_features).setInputCol("filtered").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

### Bigram tfidf

In [8]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
bigram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
#ngramDataFrame = ngram.transform(review_rmsw)
#ngramDataFrame.select("bingrams").show(1, truncate = False)
hashingTF = HashingTF().setNumFeatures(n_features).setInputCol("bigrams").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

### Tribgram tfidf

In [9]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
trigram = NGram(n=3, inputCol="filtered", outputCol="trigrams")
#ngramDataFrame = ngram.transform(review_rmsw)
#ngramDataFrame.select("bingrams").show(1, truncate = False)
hashingTF = HashingTF().setNumFeatures(n_features).setInputCol("trigrams").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

### Split train and test data

In [10]:
train_set, test_set= review_rmsw.randomSplit([0.8, 0.2])
train_set = train_set.cache()
test_set = test_set.cache()

### Define evaluation metrics

In [11]:
# compute accuracy on the test set 
def evaluate_metric(predictions):
    
    evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
    print "Area under ROC curve:",evaluator.evaluate(predictions)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="f1")
    f1 = evaluator.evaluate(predictions)
    print("F1_score = %0.4f" %(f1))

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %0.4f" %(accuracy))
    

### Model 1: Logistic regression

In [12]:
%%time
lr = LogisticRegression().setRegParam(0.01).setThreshold(0.5)
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, lr])

model=pipeline.fit(train_set)
predictions = model.transform(test_set)

Py4JJavaError: An error occurred while calling o86.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 22.0 failed 1 times, most recent failure: Lost task 11.0 in stage 22.0 (TID 499, localhost, executor driver): java.lang.OutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:126)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:153)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:120)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:82)
	at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.wholestagecodegen_init_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.ml.feature.IDF.fit(IDF.scala:92)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.<init>(UnsafeInMemorySorter.java:126)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:153)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:120)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:82)
	at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.wholestagecodegen_init_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown Source)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 49970)
----------------------------------------


Traceback (most recent call last):
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/serializers.py", line 577, in read_int
    raise EOFError
EOFError


In [None]:
predictions.show()

In [9]:
#print evaluation metrics
evaluate_metric(predictions)

NameError: name 'predictions' is not defined

### Cross validation to find best parameter

In [None]:
paramGrid = ParamGridBuilder()\
    .addGrid(hashingTF.numFeatures,[100,1000,10000])\
    .addGrid(idf.minDocFreq,[0,10,100])\
    .build()

In [None]:
evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(5)

In [None]:
%%time
cvModel = cv.fit(train_set)
print "Area under the ROC curve for best fitted model =",evaluator.evaluate(cvModel.transform(test_set))

In [None]:
print "Area under ROC curve for non-tuned model:",evaluator.evaluate(predictions)
print "Area under ROC curve for fitted model:",evaluator.evaluate(cvModel.transform(test_set))

### Model 2: Unigram Naive Bayes

In [14]:
%%time
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, nb])

nb_model=pipeline.fit(train_set)
nb_prediction = model.transform(test_set)

#print evaluation metrics
evaluate_metric(nb_prediction)

Traceback (most recent call last):
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 290, in _handle_request_noblock
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
Py4JNetworkError: Error while receiving
    self.process_request(request, client_address)
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/Users/xiaohui/anaconda/lib/python2.7/SocketServer.py", line 331, in finish_request
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 50265)
----------------------------------------


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50235)
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
  File "/Users/xiaohui/anaconda/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:50235)

In [12]:

# nb_prediction.show()

### Model 3: Bigram Naive Bayes

In [None]:
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,bigram,hashingTF,idf, nb])
nb_model_bigram=pipeline.fit(train_set)
nb_prediction_bigram = nb_model_bigram.transform(test_set)

#print evaluation metrics
evaluate_metric(nb_prediction_bigram)

### Model 4: Trigram Naive Bayes

In [None]:
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,trigram,hashingTF,idf, nb])
nb_model_trigram=pipeline.fit(train_set)
nb_prediction_trigram = nb_model_trigram.transform(test_set)
#print evaluation metrics
evaluate_metric(nb_prediction_bigram)

### Model 5: Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(maxDepth=20)

pipeline = Pipeline(stages=[tokenizer,remover,trigram,hashingTF,idf, rf])
rf_model = pipeline.fit(train_set)
rf_prediction = rf_model.transform(test_set)
#print evaluation metrics
evaluate_metric(rf_prediction)


### Model 6: Multilayer perceptron classifier 

In [None]:
# specify layers for the neural network:
# input layer of size 20 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)

layers = [n_features, 5 , 2] 
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
pipeline = Pipeline(stages=[tokenizer,remover,trigram,hashingTF,idf, trainer])
rf_model = pipeline.fit(train_set)
rf_prediction = rf_model.transform(test_set)
#print evaluation metrics
evaluate_metric(rf_prediction)