In [9]:
import pandas as pd
import numpy as np
import re
import os
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline 
from pyspark.sql.functions import rand 
from pyspark.mllib.evaluation import MulticlassMetrics 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer


sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
path="Imdb/"
positiveFiles = [x for x in os.listdir(path + "train/pos/") if x.endswith(".txt")]
negativeFiles = [x for x in os.listdir(path + "train/neg/") if x.endswith(".txt")]
print("done")

done


In [10]:
posReviews, negReviews = [], []

for posfile in positiveFiles:
    with open(path + "train/pos/" + posfile, encoding= "latin1") as file:
        posReviews.append(file.read())
for negfile in negativeFiles:
    with open(path + "train/neg/" + negfile, encoding= "latin1") as file:
        negReviews.append(file.read())
print("done")

done


In [11]:
reviews = pd.concat([
    pd.DataFrame({"review":posReviews, "label":1, "file":positiveFiles}),
    pd.DataFrame({"review":negReviews, "label":0, "file":negativeFiles})
], ignore_index=True).sample(frac=1, random_state=5)

reviews.head()

mySchema = StructType([ StructField("file", StringType(), True)\
                       ,StructField("label", IntegerType(), True)\
                       ,StructField("review", StringType(), True)])

#convert the data to spark dataframe so that it can be split up and drop the file column 
reviews2 = spark.createDataFrame(reviews,schema=mySchema)
reviews2 = reviews2.drop("file")
(trainSet, validationSet, testSet) = reviews2.randomSplit([0.90, 0.05, 0.05], seed = 2000)

# duplicates training and testing dat for another tokenizer we used
trainSet2= trainSet
validationSet2 = validationSet
testSet2 = testSet
print("done")

done


In [12]:
# tells our tokenizer which columns to use and output to  
tokenizer = Tokenizer(inputCol="review", outputCol="words")
# Takes a set of terms and turns them into features vectors. The feature vectors being the words 
# in our review sentences. 

hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
# Takes out all words that do not apear more than 5 times in the data 
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) 
# Tells the pipline what column to output the classification label to. 
label_stringIdx = StringIndexer(inputCol = "label", outputCol = "label 2.o")
# builds the pipeline with the parameters we just set up 
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

# Fits the model to data
# sets parameters to the model 
pipelineFit = pipeline.fit(trainSet)
# sets the data to the parameters 
trainDF = pipelineFit.transform(trainSet)
validationDF = pipelineFit.transform(validationSet)
trainDF.show(5)

+-----+-----------+-------------+--------------------+--------------------+---------+
|label|     review|        words|                  tf|            features|label 2.o|
+-----+-----------+-------------+--------------------+--------------------+---------+
|    0|10000_4.txt|[10000_4.txt]|(65536,[40178],[1...|(65536,[40178],[0...|      0.0|
|    0|10004_3.txt|[10004_3.txt]|(65536,[39598],[1...|(65536,[39598],[0...|      0.0|
|    0|10006_4.txt|[10006_4.txt]|(65536,[37352],[1...|(65536,[37352],[0...|      0.0|
|    0|10010_3.txt|[10010_3.txt]|(65536,[2046],[1.0])|(65536,[2046],[0.0])|      0.0|
|    0|10016_4.txt|[10016_4.txt]|(65536,[42997],[1...|(65536,[42997],[0...|      0.0|
+-----+-----------+-------------+--------------------+--------------------+---------+
only showing top 5 rows



In [13]:
# Applying logistic regression to our model
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(trainDF)
# Creates our predeictions and measures accuracy 
predictions = lrModel.transform(validationDF)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.5

In [34]:
from pyspark.ml.feature import CountVectorizer
# same as our above model except the use of CountVectorizer 
# 

cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) 

pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx])

pipelineFit = pipeline.fit(trainSet2)
trainDF = pipelineFit.transform(trainSet2)
validationDF = pipelineFit.transform(validationSet2)
lrModel = lr.fit(trainDF)

predictions = lrModel.transform(validationDF)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

accuracy = evaluator.evaluate(predictions)
print(accuracy)

pipelineFit.save("pipeline/fitted_pipeline")


0.5
<class 'pyspark.ml.pipeline.PipelineModel'>


Py4JJavaError: An error occurred while calling o39778.save.
: java.io.IOException: Path pipeline/fitted_pipeline already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:702)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
