In [0]:
# Required to run Spark on Google Colab.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
import findspark
findspark.init()

In [0]:
# Imports
import pyspark
from pyspark.sql import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import HashingTF
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd
import re

# Setting up the Spark Session.
conf = pyspark.SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '10G')
        .set('spark.driver.memory', '10G')
        .set('spark.driver.maxResultSize', '10G'))
sc = pyspark.SparkContext(conf=conf)

In [0]:
# Reading  all the data first into a Dataframe then converting to a Spark Dataframe.
train_df = pd.read_csv('train.csv', sep=',')
mapping_df = pd.read_csv('mapping.csv', sep=',')
test_df = pd.read_csv('test.csv', sep=',')

In [0]:
# One-hot-coding for training data.
C = []
for i in mapping_df.iloc[:, -1]:
    C.append(i)
    train_df[i] = 0

dic = {}
for i, j in enumerate(train_df["genre"]):
    dic[train_df["movie_id"].iloc[i]] = []
    j = re.findall("\'(.*?)\'", j)
    for s in j:
        train_df[s].iloc[i] = 1
        dic[train_df["movie_id"].iloc[i]].append(C.index(s))
        

# One-hot coding for testing data.
for i in C:
    test_df[i] = 0

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._setitem_with_indexer(indexer, value)


In [0]:
# SQL conversion of training data.
sqlCtx1 = SQLContext(sc)
train_df = sqlCtx1.createDataFrame(train_df)

In [0]:
# SQL conversion of testing data.
sqlCtx3 = SQLContext(sc)
test_df = sqlCtx3.createDataFrame(test_df)

In [0]:
# Cleaning the movie plot from the training data.
tokenized_train_data = RegexTokenizer(inputCol='plot', outputCol='Words_plot', pattern='\\W')
tokenized_train_data = tokenized_train_data.transform(train_df)
clean_train_data = StopWordsRemover(inputCol='Words_plot', outputCol='filtered').transform(tokenized_train_data)

In [0]:
# Cleaning the plot of testing data.
tokenized_test_data = RegexTokenizer(inputCol='plot', outputCol='Words_plot', pattern='\\W').transform(test_df)
clean_test_data = StopWordsRemover(inputCol='Words_plot', outputCol='filtered').transform(tokenized_test_data)

In [0]:
# Calculating the term frequency for training data.
print(type(clean_train_data))
hashingTermFreq = HashingTF(inputCol='filtered', outputCol='feature', numfeatures=15000)
tfFeatured_train_data = hashingTermFreq.transform(clean_train_data)

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
# Calculating the term frequency for testing data.
print(type(clean_test_data))
hashingTermFreq = HashingTF(inputCol='filtered', outputCol='feature', numfeatures=15000)
tfFeatured_test_data = hashingTermFreq.transform(clean_test_data)

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
# Creating data frames for both testing and training data with only one genre.

train = tfFeatured_train_data
test = tfFeatured_test_data

drama_train = train.select('movie_id', "feature", "Drama")
drama_test = test.select('movie_id', "feature", "Drama")

comedy_train = train.select('movie_id', "feature", "Comedy")
comedy_test = test.select('movie_id', "feature", "Comedy")

romance_film_train = train.select('movie_id', "feature", "Romance Film")
romance_film_test = test.select('movie_id', "feature", "Romance Film")

thriller_train = train.select('movie_id', "feature", "Thriller")
thriller_test = test.select('movie_id', "feature", "Thriller")

action_train = train.select('movie_id', "feature", "Action")
action_test = test.select('movie_id', "feature", "Action")

world_cinema_train = train.select('movie_id', "feature", "World cinema")
world_cinema_test = test.select('movie_id', "feature", "World cinema")

crime_fiction_train = train.select('movie_id', "feature", "Crime Fiction")
crime_fiction_test = test.select('movie_id', "feature", "Crime Fiction")

horror_train = train.select('movie_id', "feature", "Horror")
horror_test = test.select('movie_id', "feature", "Horror")

black_and_white_train = train.select('movie_id', "feature", "Black-and-white")
black_and_white_test = test.select('movie_id', "feature", "Black-and-white")

indie_train = train.select('movie_id', "feature", "Indie")
indie_test = test.select('movie_id', "feature", "Indie")

action_adventure_train = train.select('movie_id', "feature", "Action/Adventure")
action_adventure_test = test.select('movie_id', "feature", "Action/Adventure")

adventure_train = train.select('movie_id', "feature", "Adventure")
adventure_test = test.select('movie_id', "feature", "Adventure")

family_film_train = train.select('movie_id', "feature", "Family Film")
family_film_test = test.select('movie_id', "feature", "Family Film")

short_film_train = train.select('movie_id', "feature", "Short Film")
short_film_test = test.select('movie_id', "feature", "Short Film")

romantic_drama_train = train.select('movie_id', "feature", "Romantic drama")
romantic_drama_test = test.select('movie_id', "feature", "Romantic drama")

animation_train = train.select('movie_id', "feature", "Animation")
animation_test = test.select('movie_id', "feature", "Animation")

musical_train = train.select('movie_id', "feature", "Musical")
musical_test = test.select('movie_id', "feature", "Musical")

science_fiction_train = train.select('movie_id', "feature", "Science Fiction")
science_fiction_test = test.select('movie_id', "feature", "Science Fiction")

mystery_train = train.select('movie_id', "feature", "Mystery")
mystery_test = test.select('movie_id', "feature", "Mystery")

romantic_comedy_train = train.select('movie_id', "feature", "Romantic comedy")
romantic_comedy_test = test.select('movie_id', "feature", "Romantic comedy")

+--------+--------------------+-----+
|movie_id|             feature|Drama|
+--------+--------------------+-----+
|23890098|(262144,[2437,127...|    1|
|31186339|(262144,[991,1739...|    1|
|20663735|(262144,[119,571,...|    1|
| 2231378|(262144,[619,1998...|    0|
|  595909|(262144,[1911,243...|    1|
| 5272176|(262144,[571,4977...|    1|
| 1952976|(262144,[343,3294...|    1|
|24225279|(262144,[2437,251...|    1|
| 2462689|(262144,[1156,433...|    1|
|20532852|(262144,[929,3924...|    0|
+--------+--------------------+-----+
only showing top 10 rows

+--------+--------------------+-----+
|movie_id|             feature|Drama|
+--------+--------------------+-----+
| 1335380|(262144,[1728,261...|    0|
|29062594|(262144,[6068,191...|    0|
| 9252321|(262144,[1598,208...|    0|
|13455076|(262144,[3294,618...|    0|
|24165951|(262144,[4098,644...|    0|
| 1925869|(262144,[535,3294...|    0|
|10799612|(262144,[5053,538...|    0|
|28238240|(262144,[23060,30...|    0|
|17124781|(262144,[5232,

In [0]:
# Creating 20 logistic regression classifiers for the 20 genres.
# Here features are the term frequency and labels are 1 and 0 depending on availability of that genre.

lr1 = LogisticRegression(featuresCol="feature", labelCol="Drama")
lr2 = LogisticRegression(featuresCol="feature", labelCol="Comedy")
lr3 = LogisticRegression(featuresCol="feature", labelCol="Romance Film")
lr4 = LogisticRegression(featuresCol="feature", labelCol="Thriller")
lr5 = LogisticRegression(featuresCol="feature", labelCol="Action")
lr6 = LogisticRegression(featuresCol="feature", labelCol="World cinema")
lr7 = LogisticRegression(featuresCol="feature", labelCol="Crime Fiction")
lr8 = LogisticRegression(featuresCol="feature", labelCol="Horror")
lr9 = LogisticRegression(featuresCol="feature", labelCol="Black-and-white")
lr10 = LogisticRegression(featuresCol="feature", labelCol="Indie")
lr11 = LogisticRegression(featuresCol="feature", labelCol="Action/Adventure")
lr12 = LogisticRegression(featuresCol="feature", labelCol="Adventure")
lr13 = LogisticRegression(featuresCol="feature", labelCol="Family Film")
lr14 = LogisticRegression(featuresCol="feature", labelCol="Short Film")
lr15 = LogisticRegression(featuresCol="feature", labelCol="Romantic drama")
lr16 = LogisticRegression(featuresCol="feature", labelCol="Animation")
lr17 = LogisticRegression(featuresCol="feature", labelCol="Musical")
lr18 = LogisticRegression(featuresCol="feature", labelCol="Science Fiction")
lr19 = LogisticRegression(featuresCol="feature", labelCol="Mystery")
lr20 = LogisticRegression(featuresCol="feature", labelCol="Romantic comedy")


# Train the model with each data frame.
lrModel1 = lr1.fit(drama_train)
lrModel2 = lr2.fit(comedy_train)
lrModel3 = lr3.fit(romance_film_train)
lrModel4 = lr4.fit(thriller_train)
lrModel5 = lr5.fit(action_train)
lrModel6 = lr6.fit(world_cinema_train)
lrModel7 = lr7.fit(crime_fiction_train)
lrModel8 = lr8.fit(horror_train)
lrModel9 = lr9.fit(black_and_white_train)
lrModel10 = lr10.fit(indie_train)
lrModel11 = lr11.fit(action_adventure_train)
lrModel12 = lr12.fit(adventure_train)
lrModel13 = lr13.fit(family_film_train)
lrModel14 = lr14.fit(short_film_train)
lrModel15 = lr15.fit(romantic_drama_train)
lrModel16 = lr16.fit(animation_train)
lrModel17 = lr17.fit(musical_train)
lrModel18 = lr18.fit(science_fiction_train)
lrModel19 = lr19.fit(mystery_train)
lrModel20 = lr20.fit(romantic_comedy_train)


f1 = lrModel1.transform(drama_test)
f2 = lrModel2.transform(comedy_test)
f3 = lrModel3.transform(romance_film_test)
f4 = lrModel4.transform(thriller_test)
f5 = lrModel5.transform(action_test)
f6 = lrModel6.transform(world_cinema_test)
f7 = lrModel7.transform(crime_fiction_test)
f8 = lrModel8.transform(horror_test)
f9 = lrModel9.transform(black_and_white_test)
f10 = lrModel10.transform(indie_test)
f11 = lrModel11.transform(action_adventure_test)
f12 = lrModel12.transform(adventure_test)
f13 = lrModel13.transform(family_film_test)
f14 = lrModel14.transform(short_film_test)
f15 = lrModel15.transform(romantic_drama_test)
f16 = lrModel16.transform(animation_test)
f17 = lrModel17.transform(musical_test)
f18 = lrModel18.transform(science_fiction_test)
f19 = lrModel19.transform(mystery_test)
f20 = lrModel20.transform(romantic_comedy_test)

In [0]:
# Merging the movie_id with all 20 predictions.

test_df = test_df.withColumn("id", monotonically_increasing_id())
f1 = f1.withColumn("id", monotonically_increasing_id())
c0 = test_df.join(f1, ["id"]).drop("id", "rawPrediction", "probability", "id", "Drama", "plot", "movie_name", 'Drama',
                                   'Comedy', 'Romance Film', 'Thriller', 'Action', 'World cinema', 'Crime Fiction',
                                   'Horror', 'Black-and-white', 'Indie', 'Action/Adventure', 'Adventure', 'Family Film',
                                   'Short Film', 'Romantic drama', 'Animation', 'Musical', 'Science Fiction', 'Mystery',
                                   'Romantic comedy')
c1 = c0.join(f2, ['movie_id']).drop("rawPrediction", "probability", "id", "Comedy")
c2 = c1.join(f3, ['movie_id']).drop("rawPrediction", "probability", "id", "Romance Film")
c3 = c2.join(f4, ['movie_id']).drop("rawPrediction", "probability", "id", "Thriller")
c4 = c3.join(f5, ['movie_id']).drop("rawPrediction", "probability", "id", "Action")
c5 = c4.join(f6, ['movie_id']).drop("rawPrediction", "probability", "id", "World cinema")
c6 = c5.join(f7, ['movie_id']).drop("rawPrediction", "probability", "id", "Crime Fiction")
c7 = c6.join(f8, ['movie_id']).drop("rawPrediction", "probability", "id", "Horror")
c8 = c7.join(f9, ['movie_id']).drop("rawPrediction", "probability", "id", "Black-and-white")
c9 = c8.join(f10, ['movie_id']).drop("rawPrediction", "probability", "id", "Indie")
c10 = c9.join(f11, ['movie_id']).drop("rawPrediction", "probability", "id", "Action/Adventure")
c11 = c10.join(f12, ['movie_id']).drop("rawPrediction", "probability", "id", "Adventure")
c12 = c11.join(f13, ['movie_id']).drop("rawPrediction", "probability", "id", "Family Film")
c13 = c12.join(f14, ['movie_id']).drop("rawPrediction", "probability", "id", "Short Film")
c14 = c13.join(f15, ['movie_id']).drop("rawPrediction", "probability", "id", "Romantic drama")
c15 = c14.join(f16, ['movie_id']).drop("rawPrediction", "probability", "id", "Animation")
c16 = c15.join(f17, ['movie_id']).drop("rawPrediction", "probability", "id", "Musical")
c17 = c16.join(f18, ['movie_id']).drop("rawPrediction", "probability", "id", "Science Fiction")
c18 = c17.join(f19, ['movie_id']).drop("rawPrediction", "probability", "id", "Mystery")
c19 = c18.join(f20, ['movie_id']).drop("rawPrediction", "probability", "id", "Romantic comedy", "feature")

In [0]:
final_output_df = c19.select("*").toPandas()

In [0]:
final_output_df = final_output_df.astype(int)
final_output_df.head()

Unnamed: 0,movie_id,movie_id.1,prediction,prediction.1,prediction.2,prediction.3,prediction.4,prediction.5,prediction.6,prediction.7,prediction.8,prediction.9,prediction.10,prediction.11,prediction.12,prediction.13,prediction.14,prediction.15,prediction.16,prediction.17,prediction.18,prediction.19
0,62693,62693,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
1,296252,296252,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0
2,1356971,1356971,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,1428872,1428872,1,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,1582173,1582173,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0


In [0]:
final_output_df['final_prediction'] = final_output_df[final_output_df.columns[2]].apply(
    lambda x: ' '.join(x.dropna().astype(str)),
    axis=1
)

Unnamed: 0,movie_id,movie_id.1,prediction,prediction.1,prediction.2,prediction.3,prediction.4,prediction.5,prediction.6,prediction.7,prediction.8,prediction.9,prediction.10,prediction.11,prediction.12,prediction.13,prediction.14,prediction.15,prediction.16,prediction.17,prediction.18,prediction.19,final_prediction
0,62693,62693,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1,296252,296252,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 1 0 0
2,1356971,1356971,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3,1428872,1428872,1,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4,1582173,1582173,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,1 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7772,32032279,32032279,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7773,33509716,33509716,1,0,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0
7774,33645448,33645448,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0
7775,34195696,34195696,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0 0 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0


In [0]:
final_output_df = final_output_df.drop(['prediction'], axis=1)

In [0]:
final_output_df = final_output_df.astype(str)

Unnamed: 0,movie_id,movie_id.1,final_prediction
0,62693,62693,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1,296252,296252,0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 1 0 0
2,1356971,1356971,1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3,1428872,1428872,1 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4,1582173,1582173,1 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0
...,...,...,...
7772,32032279,32032279,1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7773,33509716,33509716,1 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0
7774,33645448,33645448,0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0
7775,34195696,34195696,0 0 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0


In [0]:
final_output_df.to_csv('Output.csv')