In [1]:
#import findspark

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, col, lower, regexp_replace, monotonically_increasing_id, split
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover, HashingTF, IDF, OneHotEncoderEstimator, StringIndexer, VectorAssembler, Binarizer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


# PART 2

In [2]:
# new spark session, getting data, creating spark dataframe


# You may be able to figure something out better
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "32g")\
        .config("spark.driver.memory", "32g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","32g")\
        .getOrCreate()

spark2 = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "32g")\
        .config("spark.driver.memory", "32g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","32g")\
        .getOrCreate()

x = pd.read_csv('train.csv')
df = spark.createDataFrame(x)

df.printSchema()
df.show

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)



<bound method DataFrame.show of DataFrame[movie_id: bigint, movie_name: string, plot: string, genre: string]>

# Cleaning plot data

In [3]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

#Drop irrelevant features
drop_list = ['movie_id', 'movie_name']
data = df.select([column for column in df.columns if column not in drop_list])


# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] # standard stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
label_stringIdx = StringIndexer(inputCol = "genre", outputCol = "label")

# Fit the pipeline to training documents.
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|                plot|               genre|               words|            filtered|            features| label|
+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|Shlykov, a hard-w...|['World cinema', ...|[shlykov, a, hard...|[shlykov, a, hard...|(10000,[1,2,18,26...|   4.0|
|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|[nation, of, pane...|(10000,[0,1,2,3,4...|1603.0|
|Poovalli Induchoo...|['Musical', 'Acti...|[poovalli, induch...|[poovalli, induch...|(10000,[0,1,2,3,4...| 316.0|
|The Lemon Drop Ki...|          ['Comedy']|[the, lemon, drop...|[lemon, drop, kid...|(10000,[0,1,2,3,4...|   1.0|
|Seventh-day Adven...|['Crime Fiction',...|[seventh, day, ad...|[seventh, day, ad...|(10000,[0,1,2,3,4...| 113.0|
+--------------------+--------------------+--------------------+--------------------+---

In [4]:
#RANDOM FOREST using TF-IDF Features

from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

#dataset = pipelineFit.transform(dataset)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)





rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rf_tfidf_Model = rf.fit(trainingData)
predictions = rf_tfidf_Model.transform(testData)
results = predictions.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

cm = metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])
f1score = 2*(precision*recall)/(precision+recall)
print("RandomForestClassifier TF-IDF accuracy: ",accuracy)
print("RandomForestClassifier TF-IDF precision: ",precision)
print("RandomForestClassifier TF-IDF recall: ", recall)
print("RandomForestClassifier TF-IDF f1score: ", f1score)

RandomForestClassifier TF-IDF accuracy:  0.11836558209923252
RandomForestClassifier TF-IDF precision:  0.7363819771351715
RandomForestClassifier TF-IDF recall:  1.0
RandomForestClassifier TF-IDF f1score:  0.8481797056545314


# Testing and Evaluation on Test set

In [None]:
# new spark session, getting data, creating spark dataframe

# You may be able to figure something out better
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "32g")\
        .config("spark.driver.memory", "32g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","32g")\
        .getOrCreate()

spark2 = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "32g")\
        .config("spark.driver.memory", "32g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","32g")\
        .getOrCreate()


x = pd.read_csv('test.csv')
df = spark2.createDataFrame(x)

df.printSchema()
df.show(5)

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

#Drop irrelevant features
drop_list = ['movie_id', 'movie_name']
data = df.select([column for column in df.columns if column not in drop_list])


# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] # standard stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
label_stringIdx = StringIndexer(inputCol = "genre", outputCol = "label")

# Fit the pipeline to training documents.
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

In [None]:
#RANDOM FOREST

rf_predictions = rf_tfidf_Model.transform(dataset)
results = predictions.select(['prediction', 'label'])
predictionAndLabels=results.rdd
metrics = MulticlassMetrics(predictionAndLabels)

cm = metrics.confusionMatrix().toArray()
accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[0][0])/(cm[0][0]+cm[1][0])
recall=(cm[0][0])/(cm[0][0]+cm[0][1])
f1score = 2*(precision*recall)/(precision+recall)
print("RandomForestClassifier: accuracy, precision, recall, f1score",accuracy,precision,recall,f1score)

In [None]:
from pyspark.ml.feature import IndexToString

f = IndexToString(
    inputCol="prediction", outputCol="categoryValue")

f.transform(results).drop("id").distinct().show()