# Data Intensive Computing- Movie Genre Classification

<br>
<h4>Overview:</h4> <br>
The project demostrates multi-label classification of Movie genre by applying NLP techniques like CountVectorizer, TF-IDF and Word2Vec on plot summaries of the movie.First, the required libraries are imported followed by fetching the train, test and mapping data.
Next the data is preprocessed and cleaned by tokenizing "plot" column and then stop words are removed from our tokenized terms.
Then, features are extracted from the above data frames created using CountVectorizer (for Part 1), TF-IDF (for Part 2) and Word2Vec (for Part 3). 
Random Forest classifier is used to create a model and attempt to predict the genre of our test data. The multi-label prediction is done in accordance with the format in mapping.csv.

<br><br>
## Importing Libraries

In [1]:
import pyspark
import findspark
import pandas as pd
from pyspark.sql import *
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import split, regexp_replace
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
from pyspark.sql.functions import col, udf, lit, explode
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import when, concat, explode,concat_ws
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, Word2Vec
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.sql.types import StructType, StructField, LongType, IntegerType

## Creating a Spark Instance

In [2]:
sc = pyspark.SparkContext()
spark = SparkSession.builder \
        .master("local") \
        .appName("myapp") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sqlContext = SQLContext(sc)
sql = SQLContext(spark)

## Fetching Train, Test and Mapping Data

In [3]:
# Train Data
traindata = pd.read_csv("train.csv")
train_data = sql.createDataFrame(traindata)
train_data.show(2)

# Test Data
testdata = pd.read_csv("test.csv")
test_data = sql.createDataFrame(testdata)
test_data.show(2)

# Mapping Data
mappingdata = pd.read_csv("mapping.csv")
mapping_data = sql.createDataFrame(mappingdata)
mapping_data.show(2)

+--------+----------------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|
+--------+----------------+--------------------+--------------------+
|23890098|      Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|The Hunger Games|The nation of Pan...|['Action/Adventur...|
+--------+----------------+--------------------+--------------------+
only showing top 2 rows

+--------+--------------------+--------------------+
|movie_id|          movie_name|                plot|
+--------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|
|29062594|A la salida nos v...|A group of teenag...|
+--------+--------------------+--------------------+
only showing top 2 rows

+----------+------+
|Unnamed: 0|     0|
+----------+------+
|         0| Drama|
|         1|Comedy|
+----------+------+
only showing top 2 rows



In [4]:
# We have fetched the Genre mapping data and stored the index-genre pair as the key-value pair in a dictionary
# We will use this later on when we train our data on 20 classifiers

mapping_data = (mapping_data.withColumnRenamed("Unnamed: 0","ID").withColumnRenamed("0","Genre")).collect()
map_dictionary = {}
for i in range(0,20):
    map_dictionary[i] = mapping_data[i]['Genre'] 

## Preprocessing: RegexTokenizer and StopWordsRemover

In [5]:
# tokenizer = Tokenizer(inputCol="plot", outputCol="tokenizedterms")
# train_df2 = tokenizer.transform(train_df1).head(31108)

rt = RegexTokenizer(inputCol = "plot", outputCol = "tokenized_terms", pattern = "\\W")
train_df = rt.transform(train_data)
test_df = rt.transform(test_data)

swr = StopWordsRemover(inputCol = "tokenized_terms", outputCol = "updated_terms")
train_df = swr.transform(train_df)
test_df = swr.transform(test_df)

In [6]:
train_df = train_df.drop('tokenized_terms')
test_df = test_df.drop('tokenized_terms')

In [7]:
train_df1 = train_df
train_df2 = train_df
train_df3 = train_df
test_df1 = test_df
test_df2 = test_df
test_df3 = test_df

In [8]:
mapping_df = pd.read_csv("/home/cse587/dic487-587/mapping.csv", index_col = 0)

## Part 1 : Term Document Matrix (Count Vectorizer)

In [9]:
# Fitting the Count Vectorizer over our train and test data
cv = CountVectorizer(inputCol = "updated_terms", outputCol = "features", vocabSize = 9000, minDF = 8)
model = cv.fit(train_df1)
train_df1 = model.transform(train_df1)
train_df1.show(n = 5)

model = cv.fit(test_df1)
test_df1 = model.transform(test_df1)
test_df1.show(n = 5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|       updated_terms|            features|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, hard, w...|(9000,[10,129,186...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|[nation, panem, c...|(9000,[2,6,7,10,1...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|[poovalli, induch...|(9000,[1,3,8,10,1...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|[lemon, drop, kid...|(9000,[7,9,12,15,...|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|[seventh, day, ad...|(9000,[2,8,9,14,1...|
+--------+------------------+--------------------+--------------------+-----------------

### String Indexing

In [10]:
# Start of Random Forest Implementation
str_ind = StringIndexer(inputCol = "genre", outputCol = "multiLabels")
model = str_ind.fit(train_df1)
train_df1 = model.transform(train_df1)
train_df1.show(n=1)
u_labels = model.labels

+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
|movie_id|movie_name|                plot|               genre|       updated_terms|            features|multiLabels|
+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, hard, w...|(9000,[10,129,186...|        4.0|
+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
only showing top 1 row



### Random Forest

In [11]:
rfc_model = RandomForestClassifier(labelCol="multiLabels", \
                            featuresCol="features", \
                            numTrees = 10, \
                            maxDepth = 4, \
                            maxBins = 10)
# Train model with Training Data
train_Model = rfc_model.fit(train_df1)

### Prediction and Index to String

In [12]:
test_pred1 = train_Model.transform(test_df1)
ind_to_str = IndexToString(inputCol="prediction", outputCol="ClassLabel", labels = u_labels)
test_result = ind_to_str.transform(test_pred1)
test_result.columns

['movie_id',
 'movie_name',
 'plot',
 'updated_terms',
 'features',
 'rawPrediction',
 'probability',
 'prediction',
 'ClassLabel']

### String to 0s and 1s

In [13]:
result_values = test_result.select('movie_id','ClassLabel').collect()
result = [['movie_id','predictions']]
for idx,i in enumerate(result_values):
#     print(''.join(str([1 if x[0]==i else 0 for x in mapping_df.values])))
    cats = str(i['ClassLabel']).split(',')
    cats = [x.strip("[]\'' ") for x in cats]
    val = ''.join(map(str,[str(1)+' ' if x[0] in cats else str(0)+' ' for x in mapping_df.values]))
    result.append([str(i['movie_id']),val.strip(' ')])
print(result[5983])

['29162674', '1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0']


### Saving the dataframe

In [14]:
result = pd.DataFrame(result)
result.to_csv('rf1.csv',index = False, header = False) #Saving the csv file to our local folder

## Part 2 : TF-IDF

In [15]:
hashTF_init = HashingTF(inputCol = "updated_terms", outputCol = "rawfeatures", numFeatures = 10000)
hashtrain_transform = hashTF_init.transform(train_df2)
idftrain = IDF(inputCol = "rawfeatures", outputCol = "features")
idftrain_fit = idftrain.fit(hashtrain_transform)
train_df2 = idftrain_fit.transform(hashtrain_transform)

#hashTF_init = HashingTF(inputCol="updatedterms", outputCol="rawfeatures", numFeatures=9000)
hashtest_transform = hashTF_init.transform(test_df2)
idftest = IDF(inputCol = "rawfeatures", outputCol = "features")
idftest_fit = idftest.fit(hashtest_transform)
test_df2 = idftest_fit.transform(hashtest_transform)

# While creating TF - IDF we have to make sure that both the train and test data have the same number of features.
# This is because if they aren't the same then Naives Bayes returns a dimension mismatch error while creating the
# model and predicting the values

### String Indexing

In [16]:
# Start of Random Forest Implementation
str_ind = StringIndexer(inputCol = "genre", outputCol = "multiLabels")
model = str_ind.fit(train_df2)
train_df2 = model.transform(train_df2)
train_df2.show(n=1)
u_labels = model.labels

+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+
|movie_id|movie_name|                plot|               genre|       updated_terms|         rawfeatures|            features|multiLabels|
+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, hard, w...|(10000,[135,719,1...|(10000,[135,719,1...|        4.0|
+--------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+
only showing top 1 row



### Random Forest

In [17]:
rfc_model = RandomForestClassifier(labelCol="multiLabels", \
                            featuresCol="features", \
                            numTrees = 10, \
                            maxDepth = 4, \
                            maxBins = 10)
# Train model with Training Data
train_Model = rfc_model.fit(train_df2)

### Prediction and Index to String

In [18]:
test_pred2 = train_Model.transform(test_df2)
ind_to_str = IndexToString(inputCol="prediction", outputCol="ClassLabel", labels = u_labels)
test_result = ind_to_str.transform(test_pred2)
test_result.columns

['movie_id',
 'movie_name',
 'plot',
 'updated_terms',
 'rawfeatures',
 'features',
 'rawPrediction',
 'probability',
 'prediction',
 'ClassLabel']

### String to 0s and 1s

In [19]:
result_values = test_result.select('movie_id','ClassLabel').collect()
result = [['movie_id','predictions']]
for idx,i in enumerate(result_values):
#     print(''.join(str([1 if x[0]==i else 0 for x in mapping_df.values])))
    cats = str(i['ClassLabel']).split(',')
    cats = [x.strip("[]\'' ") for x in cats]
    val = ''.join(map(str,[str(1)+' ' if x[0] in cats else str(0)+' ' for x in mapping_df.values]))
    result.append([str(i['movie_id']),val.strip(' ')])
print(result[5983])

['29162674', '1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0']


### Saving the dataframe

In [20]:
result = pd.DataFrame(result)
result.to_csv('rf2.csv',index = False,header = False)

## Part 3 : Word2Vec

In [21]:
word2vec = Word2Vec(vectorSize = 300, minCount = 10, inputCol = 'updated_terms', outputCol = 'features')
word2vec_transform = word2vec.fit(train_df3)
train_df3 = word2vec_transform.transform(train_df3)

word2vec = Word2Vec(vectorSize = 250, minCount = 5, inputCol = 'updated_terms', outputCol = 'vectors')
word2vec_transform = word2vec.fit(test_df3)
test_df3 = word2vec_transform.transform(test_df3)

In [None]:
test_df3

### String Indexing

In [23]:
# Start of Random Forest Implementation
str_ind = StringIndexer(inputCol = "genre", outputCol = "multiLabels")
model = str_ind.fit(train_df3)
train_df3 = model.transform(train_df3)
train_df3.show(n=1)
u_labels = model.labels

+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
|movie_id|movie_name|                plot|               genre|       updated_terms|            features|multiLabels|
+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
|23890098|Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, hard, w...|[-0.0112350715652...|        4.0|
+--------+----------+--------------------+--------------------+--------------------+--------------------+-----------+
only showing top 1 row



### Random Forest

In [24]:
rfc_model = RandomForestClassifier(labelCol="multiLabels", \
                            featuresCol="features", \
                            numTrees = 10, \
                            maxDepth = 4, \
                            maxBins = 10)
# Train model with Training Data
train_Model = rfc_model.fit(train_df3)

### Prediction and Index to String

In [26]:
test_pred3 = train_Model.transform(train_df3)
ind_to_str = IndexToString(inputCol="prediction", outputCol="ClassLabel", labels = u_labels)
test_result = ind_to_str.transform(test_pred3)
test_result.columns

['movie_id',
 'movie_name',
 'plot',
 'genre',
 'updated_terms',
 'features',
 'multiLabels',
 'rawPrediction',
 'probability',
 'prediction',
 'ClassLabel']

### String to 0s and 1s

In [27]:
result_values = test_result.select('movie_id','ClassLabel').collect()
result = [['movie_id','predictions']]
for idx,i in enumerate(result_values):
#     print(''.join(str([1 if x[0]==i else 0 for x in mapping_df.values])))
    cats = str(i['ClassLabel']).split(',')
    cats = [x.strip("[]\'' ") for x in cats]
    val = ''.join(map(str,[str(1)+' ' if x[0] in cats else str(0)+' ' for x in mapping_df.values]))
    result.append([str(i['movie_id']),val.strip(' ')])
print(result[5983])

['1716921', '1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0']


### Saving the dataframe

In [None]:
result = pd.DataFrame(result)
result.to_csv('rf3.csv',index = False,header = False)