In [0]:
# Required to run Spark on Google Colab.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
import findspark
findspark.init()

In [0]:
import findspark
import pyspark
from pyspark.sql import *
from pyspark import mllib
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Tokenizer, RegexTokenizer, StopWordsRemover, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from sklearn.preprocessing import OneHotEncoder
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithSGD
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
from pyspark import SparkConf
import re

# Setting up the Spark Session.
conf = pyspark.SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '10G')
        .set('spark.driver.memory', '10G')
        .set('spark.driver.maxResultSize', '10G'))
sc = pyspark.SparkContext(conf=conf)

In [0]:
# Reading  all the data first into a Dataframe then converting to a Spark Dataframe.
df = pd.read_csv('train.csv', sep=',')
df2 = pd.read_csv('mapping.csv', sep=',')
df3= pd.read_csv('test.csv',sep=',')

In [4]:
# One-hot-coding for training data
C=[]
for i in df2.iloc[:,-1]:
    #print(i)
    C.append(i)
    df[i]=0

dic={}
dic1={}
for i,j in enumerate(df["genre"]):
    #print(i,j)
    dic[df["movie_id"].iloc[i]]=[]
    j=re.findall("\'(.*?)\'",j)
    for s in j:
        df[s].iloc[i]=1
        dic[df["movie_id"].iloc[i]].append(C.index(s))
        

# One-hot coding for testing data
for i in C:
    df3[i]=0

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_with_indexer(indexer, value)


In [0]:
# SQL conversion of training data.
sqlCtx1 = SQLContext(sc)
df = sqlCtx1.createDataFrame(df)

In [0]:
# SQL conversion of testing data.
sqlCtx3 = SQLContext(sc)
df3 = sqlCtx3.createDataFrame(df3)

In [0]:
# Cleaning the training data.
tokenizedData = RegexTokenizer(inputCol='plot', outputCol='Words_plot', pattern='\\W')
tokenizedData=tokenizedData.transform(df)
clean_data = StopWordsRemover(inputCol='Words_plot', outputCol='filtered').transform(tokenizedData)

In [0]:
#cleaning the plot of testing data
tokenizedData2 = RegexTokenizer(inputCol='plot', outputCol='Words_plot', pattern='\\W').transform(df3)
clean_data2 = StopWordsRemover(inputCol='Words_plot', outputCol='filtered').transform(tokenizedData2)

In [9]:
# Calculate word 2 Vec for train dataset
word2Vec = Word2Vec(vectorSize=120, inputCol='filtered', outputCol='feature')
pipeword2vec = Pipeline(stages=[word2Vec])
trainword2vec = pipeword2vec.fit(clean_data)
tfFeaturedData = trainword2vec.transform(clean_data)
tfFeaturedData.show(1)

+--------+----------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+--------------------+--------------------+
|movie_id|movie_name|                plot|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|          Words_plot|            filtered|             feature|
+--------+----------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------

In [10]:
# Calculating Word 2 Vector for test dataset.
word2Vec2 = Word2Vec(vectorSize=120, inputCol='filtered', outputCol='feature')
pipeword2vec2 = Pipeline(stages=[word2Vec2])
trainword2vec2 = pipeword2vec2.fit(clean_data2)
tfFeaturedData2 = trainword2vec2.transform(clean_data2)
tfFeaturedData2.show(1)

+--------+----------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+--------------------+--------------------+
|movie_id|movie_name|                plot|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|          Words_plot|            filtered|             feature|
+--------+----------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+--------------------+--------------------+
| 1335380|    Exodu

In [0]:
# Creating data frames for both testing and training data with only one genre.

trains=tfFeaturedData
tests=tfFeaturedData2

Drama=trains.select('movie_id', "feature","Drama")
Drama1=tests.select('movie_id', "feature","Drama")

Comedy=trains.select('movie_id', "feature","Comedy")
Comedy1=tests.select('movie_id', "feature","Comedy")

Romance_Film=trains.select('movie_id', "feature","Romance Film")
Romance_Film1=tests.select('movie_id', "feature","Romance Film")

Thriller=trains.select('movie_id', "feature","Thriller")
Thriller1=tests.select('movie_id', "feature","Thriller")

Action=trains.select('movie_id', "feature","Action")
Action1=tests.select('movie_id', "feature","Action")

World_cinema=trains.select('movie_id', "feature","World cinema")
World_cinema1=tests.select('movie_id', "feature","World cinema")

Crime_Fiction=trains.select('movie_id', "feature","Crime Fiction")
Crime_Fiction1=tests.select('movie_id', "feature","Crime Fiction")

Horror=trains.select('movie_id', "feature","Horror")
Horror1=tests.select('movie_id', "feature","Horror")

Black_and_white=trains.select('movie_id', "feature","Black-and-white")
Black_and_white1=tests.select('movie_id', "feature","Black-and-white")

Indie=trains.select('movie_id', "feature","Indie")
Indie1=tests.select('movie_id', "feature","Indie")

Action_Adventure=trains.select('movie_id', "feature","Action/Adventure")
Action_Adventure1=tests.select('movie_id', "feature","Action/Adventure")

Adventure=trains.select('movie_id', "feature","Adventure")
Adventure1=tests.select('movie_id', "feature","Adventure")

Family_Film=trains.select('movie_id', "feature","Family Film")
Family_Film1=tests.select('movie_id', "feature","Family Film")

Short_Film=trains.select('movie_id', "feature","Short Film")
Short_Film1=tests.select('movie_id', "feature","Short Film")

Romantic_drama=trains.select('movie_id', "feature","Romantic drama")
Romantic_drama1=tests.select('movie_id', "feature","Romantic drama")

Animation=trains.select('movie_id', "feature","Animation")
Animation1=tests.select('movie_id', "feature","Animation")


Musical=trains.select('movie_id', "feature","Musical")
Musical1=tests.select('movie_id', "feature","Musical")

Science_Fiction=trains.select('movie_id', "feature","Science Fiction")
Science_Fiction1=tests.select('movie_id', "feature","Science Fiction")

Mystery=trains.select('movie_id', "feature","Mystery")
Mystery1=tests.select('movie_id', "feature","Mystery")

Romantic_comedy=trains.select('movie_id', "feature","Romantic comedy")
Romantic_comedy1=tests.select('movie_id', "feature","Romantic comedy")

In [0]:
from pyspark.ml.classification import LogisticRegression

# Reading the Logistic Regression, defining the features and labels
# here features are the term freqency and labels are 1 and 0 depending on availibility of that genre.

lr1 = LogisticRegression(featuresCol="feature",labelCol="Drama")
lr2= LogisticRegression(featuresCol="feature",labelCol="Comedy")
lr3= LogisticRegression(featuresCol="feature",labelCol="Romance Film")
lr4= LogisticRegression(featuresCol="feature",labelCol="Thriller")
lr5= LogisticRegression(featuresCol="feature",labelCol="Action")
lr6= LogisticRegression(featuresCol="feature",labelCol="World cinema")
lr7= LogisticRegression(featuresCol="feature",labelCol="Crime Fiction")
lr8= LogisticRegression(featuresCol="feature",labelCol="Horror")
lr9= LogisticRegression(featuresCol="feature",labelCol="Black-and-white")
lr10=LogisticRegression(featuresCol="feature",labelCol="Indie")
lr11=LogisticRegression(featuresCol="feature",labelCol="Action/Adventure")
lr12=LogisticRegression(featuresCol="feature",labelCol="Adventure")
lr13=LogisticRegression(featuresCol="feature",labelCol="Family Film")
lr14=LogisticRegression(featuresCol="feature",labelCol="Short Film")
lr15=LogisticRegression(featuresCol="feature",labelCol="Romantic drama")
lr16=LogisticRegression(featuresCol="feature",labelCol="Animation")
lr17=LogisticRegression(featuresCol="feature",labelCol="Musical")
lr18=LogisticRegression(featuresCol="feature",labelCol="Science Fiction")
lr19=LogisticRegression(featuresCol="feature",labelCol="Mystery")
lr20=LogisticRegression(featuresCol="feature",labelCol="Romantic comedy")


# Train the model with each data frame
lrModel1 = lr1.fit(Drama)
lrModel2= lr2.fit(Comedy)
lrModel3=lr3.fit(Romance_Film)
lrModel4=lr4.fit(Thriller)
lrModel5=lr5.fit(Action)
lrModel6=lr6.fit(World_cinema)
lrModel7=lr7.fit(Crime_Fiction)
lrModel8=lr8.fit(Horror)
lrModel9=lr9.fit(Black_and_white)
lrModel10=lr10.fit(Indie)
lrModel11=lr11.fit(Action_Adventure)
lrModel12=lr12.fit(Adventure)
lrModel13=lr13.fit(Family_Film)
lrModel14=lr14.fit(Short_Film)
lrModel15=lr15.fit(Romantic_drama)
lrModel16=lr16.fit(Animation)
lrModel17=lr17.fit(Musical)
lrModel18=lr18.fit(Science_Fiction)
lrModel19=lr19.fit(Mystery)
lrModel20=lr20.fit(Romantic_comedy)

# Run the test data on model, running each testing data frame on respective model
f1=lrModel1.transform(Drama1)
f2=lrModel2.transform(Comedy1)
f3=lrModel3.transform(Romance_Film1)
f4=lrModel4.transform(Thriller1)
f5=lrModel5.transform(Action1)
f6=lrModel6.transform(World_cinema1)
f7=lrModel7.transform(Crime_Fiction1)
f8=lrModel8.transform(Horror1)
f9=lrModel9.transform(Black_and_white1)
f10=lrModel10.transform(Indie1)
f11=lrModel11.transform(Action_Adventure1)
f12=lrModel12.transform(Adventure1)
f13=lrModel13.transform(Family_Film1)
f14=lrModel14.transform(Short_Film1)
f15=lrModel15.transform(Romantic_drama1)
f16=lrModel16.transform(Animation1)
f17=lrModel17.transform(Musical1)
f18=lrModel18.transform(Science_Fiction1)
f19=lrModel19.transform(Mystery1)
f20=lrModel20.transform(Romantic_comedy1)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Merging the movie_id with all 20 predictions.


df3=df3.withColumn("id",monotonically_increasing_id())
f1=f1.withColumn("id",monotonically_increasing_id())
c0=df3.join(f1,["id"]).drop("id","rawPrediction","probability","id","Drama","plot","movie_name",'Drama', 'Comedy', 'Romance Film', 'Thriller', 'Action', 'World cinema', 'Crime Fiction', 'Horror', 'Black-and-white', 'Indie', 'Action/Adventure', 'Adventure', 'Family Film', 'Short Film', 'Romantic drama', 'Animation', 'Musical', 'Science Fiction', 'Mystery', 'Romantic comedy')
c1=c0.join(f2,['movie_id']).drop("rawPrediction","probability","id","Comedy")
c2=c1.join(f3,['movie_id']).drop("rawPrediction","probability","id","Romance Film")
c3=c2.join(f4,['movie_id']).drop("rawPrediction","probability","id","Thriller")
c4=c3.join(f5,['movie_id']).drop("rawPrediction","probability","id","Action")
c5=c4.join(f6,['movie_id']).drop("rawPrediction","probability","id","World cinema")
c6=c5.join(f7,['movie_id']).drop("rawPrediction","probability","id","Crime Fiction")
c7=c6.join(f8,['movie_id']).drop("rawPrediction","probability","id","Horror")
c8=c7.join(f9,['movie_id']).drop("rawPrediction","probability","id","Black-and-white")
c9=c8.join(f10,['movie_id']).drop("rawPrediction","probability","id","Indie")
c10=c9.join(f11,['movie_id']).drop("rawPrediction","probability","id","Action/Adventure")
c11=c10.join(f12,['movie_id']).drop("rawPrediction","probability","id","Adventure")
c12=c11.join(f13,['movie_id']).drop("rawPrediction","probability","id","Family Film")
c13=c12.join(f14,['movie_id']).drop("rawPrediction","probability","id","Short Film")
c14=c13.join(f15,['movie_id']).drop("rawPrediction","probability","id","Romantic drama")
c15=c14.join(f16,['movie_id']).drop("rawPrediction","probability","id","Animation")
c16=c15.join(f17,['movie_id']).drop("rawPrediction","probability","id","Musical")
c17=c16.join(f18,['movie_id']).drop("rawPrediction","probability","id","Science Fiction")
c18=c17.join(f19,['movie_id']).drop("rawPrediction","probability","id","Mystery")
c19=c18.join(f20,['movie_id']).drop("rawPrediction","probability","id","Romantic comedy","feature")

In [0]:
final_output_df = c19.select("*").toPandas()

In [0]:
final_output_df = final_output_df.astype(int)

In [0]:
final_output_df['final_prediction'] = final_output_df[final_output_df.columns[2]].apply(
    lambda x: ' '.join(x.dropna().astype(str)),
    axis=1
)

In [0]:
final_output_df = final_output_df.drop(['prediction'], axis=1)

In [0]:
final_output_df = final_output_df.astype(str)

In [0]:
final_output_df.to_csv('Output3.csv')