When running in VM, make sure Java 1.8 is running. To change the java version, use the following:

sudo update-alternatives --config java

and choose java -version 1.8

Also, make sure you un-comment the following lines in order to run pyspark on VM

# Data Preparation


In [1]:
#import findspark
#findspark.init("/home/cse587/spark-2.4.0-bin-hadoop2.7")

In [1]:
import pyspark
from pyspark.sql.types import *
import pandas as pd
import ast
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
pdDf = pd.read_csv("train.csv")
pd_test = pd.read_csv("test.csv")

In [3]:
pdDf['genre'] = pdDf['genre'].apply(ast.literal_eval)

In [4]:
mySchema = StructType([ StructField("movie_id", StringType(), True)\
                       ,StructField("movie_name", StringType(), True)\
                       ,StructField("plot", StringType(), True)\
                       ,StructField("genre", ArrayType(StringType(),True), True)])

In [5]:
data = spark.createDataFrame(pdDf, schema=mySchema)

In [6]:
test_mySchema = StructType([ StructField("movie_id", StringType(), True)\
                       ,StructField("movie_name", StringType(), True)\
                       ,StructField("plot", StringType(), True)])

In [7]:
test_csv_data = spark.createDataFrame(pd_test, schema=test_mySchema)

In [8]:
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import udf, col


cv = CountVectorizer(inputCol='genre', outputCol='c1', binary=True)

model = cv.fit(data)

vocabulary = model.vocabulary

In [9]:
vocabulary

['Drama',
 'Comedy',
 'Romance Film',
 'Thriller',
 'Action',
 'World cinema',
 'Crime Fiction',
 'Horror',
 'Black-and-white',
 'Indie',
 'Action/Adventure',
 'Family Film',
 'Adventure',
 'Short Film',
 'Romantic drama',
 'Animation',
 'Musical',
 'Science Fiction',
 'Romantic comedy',
 'Mystery']

In [10]:
udf_to_array = udf(lambda v: v.toArray().tolist(), 'array<double>')
prep_data = model.transform(data)

prep_data = prep_data.withColumn('c2', udf_to_array('c1')) \
   .select('*', *[ col('c2')[i].alias(vocabulary[i]) for i in range(len(vocabulary))])

In [11]:
prep_data.show(2)

+--------+----------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+-----------+---------+----------+--------------+---------+-------+---------------+---------------+-------+
|movie_id|      movie_name|                plot|               genre|                  c1|                  c2|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Family Film|Adventure|Short Film|Romantic drama|Animation|Musical|Science Fiction|Romantic comedy|Mystery|
+--------+----------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+-----------+---------+----------+--------------+---------+-------+---------------+---------------+-------+
|

In [12]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
# stop words
stopwords = StopWordsRemover.loadDefaultStopWords("english")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwords)

In [13]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(prep_data)
dataset = pipelineFit.transform(prep_data)
dataset.show(2)

+--------+----------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+-----------+---------+----------+--------------+---------+-------+---------------+---------------+-------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|                  c1|                  c2|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Family Film|Adventure|Short Film|Romantic drama|Animation|Musical|Science Fiction|Romantic comedy|Mystery|               words|            filtered|
+--------+----------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+-----------+---------+--

In [14]:
test_dataset = pipelineFit.transform(test_csv_data)
test_dataset.show(2)

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               words|            filtered|
+--------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



# Part 1: Basic Model

In [15]:
countVectors = CountVectorizer(inputCol="filtered",outputCol="features",vocabSize=100000,minDF=5)

In [16]:
pipeline_1 = Pipeline(stages=[countVectors])
pipelineFit_1 = pipeline_1.fit(dataset)

In [17]:
cv_test_data = pipelineFit_1.transform(test_dataset)
cv_test_data.show(2)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               words|            filtered|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|(33975,[0,3,4,7,8...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|(33975,[7,8,56,71...|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [18]:
from pyspark.ml.classification import LogisticRegressionModel

drama_model = LogisticRegressionModel.load("./part1_lr_models/drama/")

In [19]:
pred_test_drama = drama_model.transform(cv_test_data)
final_pred_test_drama = pred_test_drama.select(col('movie_id').alias('m_id'),col('prediction').alias('P drama'))
test_df = (cv_test_data.join(final_pred_test_drama,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama")
test_df_1.show(2)

+--------+-------+
|movie_id|P drama|
+--------+-------+
|12378414|    0.0|
|13005847|    1.0|
+--------+-------+
only showing top 2 rows



In [20]:
comedy_model = LogisticRegressionModel.load("./part1_lr_models/comedy/")

pred_test_comedy = comedy_model.transform(cv_test_data)
final_pred_test_comedy = pred_test_comedy.select(col('movie_id').alias('m_id'),col('prediction').alias('P comedy'))
test_df = (test_df_1.join(final_pred_test_comedy,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P comedy")
test_df_1.show(2)

+--------+-------+--------+
|movie_id|P drama|P comedy|
+--------+-------+--------+
|12378414|    0.0|     0.0|
|13005847|    1.0|     0.0|
+--------+-------+--------+
only showing top 2 rows



In [21]:
romance_film_model = LogisticRegressionModel.load("./part1_lr_models/romance/")

pred_test_romance = romance_film_model.transform(cv_test_data)
final_pred_test_romance = pred_test_romance.select(col('movie_id').alias('m_id'),col('prediction').alias('P RF'))
test_df = (test_df_1.join(final_pred_test_romance,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF")

thriller_model = LogisticRegressionModel.load("./part1_lr_models/thriller/")

pred_test_thriller = thriller_model.transform(cv_test_data)
final_pred_test_thriller = pred_test_thriller.select(col('movie_id').alias('m_id'),col('prediction').alias('P Thriller'))
test_df = (test_df_1.join(final_pred_test_thriller,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller")

action_model = LogisticRegressionModel.load("./part1_lr_models/action/")

pred_test_action = action_model.transform(cv_test_data)
final_pred_test_action = pred_test_action.select(col('movie_id').alias('m_id'),col('prediction').alias('P Action'))
test_df = (test_df_1.join(final_pred_test_action,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action")

wc_model = LogisticRegressionModel.load("./part1_lr_models/world_cinema/")

pred_test_wc = wc_model.transform(cv_test_data)
final_pred_test_wc = pred_test_wc.select(col('movie_id').alias('m_id'),col('prediction').alias('P WC'))
test_df = (test_df_1.join(final_pred_test_wc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC")

cf_model = LogisticRegressionModel.load("./part1_lr_models/crime_fiction/")


pred_test_cf = cf_model.transform(cv_test_data)
final_pred_test_cf = pred_test_cf.select(col('movie_id').alias('m_id'),col('prediction').alias('P CF'))
test_df = (test_df_1.join(final_pred_test_cf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF")

horror_model = LogisticRegressionModel.load("./part1_lr_models/horror/")


pred_test_horror = horror_model.transform(cv_test_data)
final_pred_test_horror = pred_test_horror.select(col('movie_id').alias('m_id'),col('prediction').alias('P Horror'))
test_df = (test_df_1.join(final_pred_test_horror,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror")

bw_model = LogisticRegressionModel.load("./part1_lr_models/black_white/")


pred_test_bw = bw_model.transform(cv_test_data)
final_pred_test_bw = pred_test_bw.select(col('movie_id').alias('m_id'),col('prediction').alias('P BW'))
test_df = (test_df_1.join(final_pred_test_bw,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW")

indie_model = LogisticRegressionModel.load("./part1_lr_models/indie/")


pred_test_indie = indie_model.transform(cv_test_data)
final_pred_test_indie = pred_test_indie.select(col('movie_id').alias('m_id'),col('prediction').alias('P Indie'))
test_df = (test_df_1.join(final_pred_test_indie,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie")

aa_model = LogisticRegressionModel.load("./part1_lr_models/action_adventure/")


pred_test_aa = aa_model.transform(cv_test_data)
final_pred_test_aa = pred_test_aa.select(col('movie_id').alias('m_id'),col('prediction').alias('P AA'))
test_df = (test_df_1.join(final_pred_test_aa,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA")

a_model = LogisticRegressionModel.load("./part1_lr_models/adventure/")


pred_test_adv = a_model.transform(cv_test_data)
final_pred_test_adv = pred_test_adv.select(col('movie_id').alias('m_id'),col('prediction').alias('P Adv'))
test_df = (test_df_1.join(final_pred_test_adv,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv")

ff_model = LogisticRegressionModel.load("./part1_lr_models/family/")


pred_test_ff = ff_model.transform(cv_test_data)
final_pred_test_ff = pred_test_ff.select(col('movie_id').alias('m_id'),col('prediction').alias('P FF'))
test_df = (test_df_1.join(final_pred_test_ff,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF")

sf_model = LogisticRegressionModel.load("./part1_lr_models/short_film/")


pred_test_sf = sf_model.transform(cv_test_data)
final_pred_test_sf = pred_test_sf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SF'))
test_df = (test_df_1.join(final_pred_test_sf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF")


rd_model = LogisticRegressionModel.load("./part1_lr_models/rom_drama/")

pred_test_rd = rd_model.transform(cv_test_data)
final_pred_test_rd = pred_test_rd.select(col('movie_id').alias('m_id'),col('prediction').alias('P RD'))
test_df = (test_df_1.join(final_pred_test_rd,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD")


ani_model = LogisticRegressionModel.load("./part1_lr_models/animation/")

pred_test_ani = ani_model.transform(cv_test_data)
final_pred_test_ani = pred_test_ani.select(col('movie_id').alias('m_id'),col('prediction').alias('P Ani'))
test_df = (test_df_1.join(final_pred_test_ani,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani")


mus_model = LogisticRegressionModel.load("./part1_lr_models/musical/")

pred_test_mus = mus_model.transform(cv_test_data)
final_pred_test_mus = pred_test_mus.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mus'))
test_df = (test_df_1.join(final_pred_test_mus,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus")


scf_model = LogisticRegressionModel.load("./part1_lr_models/science_fiction/")

pred_test_scf = scf_model.transform(cv_test_data)
final_pred_test_scf = pred_test_scf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SciFi'))
test_df = (test_df_1.join(final_pred_test_scf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi")


m_model = LogisticRegressionModel.load("./part1_lr_models/mystery/")

pred_test_mys = m_model.transform(cv_test_data)
final_pred_test_mys = pred_test_mys.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mys'))
test_df = (test_df_1.join(final_pred_test_mys,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys")


rc_model = LogisticRegressionModel.load("./part1_lr_models/rom_com/")

pred_test_rc = rc_model.transform(cv_test_data)
final_pred_test_rc = pred_test_rc.select(col('movie_id').alias('m_id'),col('prediction').alias('P RC'))
test_df = (test_df_1.join(final_pred_test_rc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys", "P RC")

In [22]:
test_df_1.show(5)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|    0.0|     0.0| 0.0|       0.0|     1.0| 0.0| 0.0|     0.0| 0.0|    0.0| 0.0|  1.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13005847|    1.0|     0.0| 1.0|       0.0|     1.0| 0.0| 1.0|     0.0| 0.0|    0.0| 1.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13873910|    1.0|     0.0| 1.0|       0.0|     0.0| 1.0| 0.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 1.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13897336|    1.0|     0.0| 0.0|       0.0|     0.0| 1.0| 0.0|     0.0| 0.0|    0.0| 1.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|14355

In [24]:
from pyspark.sql.functions import col
test_res_df = test_df_1.select([col(c).cast(IntegerType()).cast("string") for c in test_df_1.columns])
test_res_df.show(2)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|      1|       1|   1|         0|       0|   1|   0|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   1|
|13005847|      0|       0|   0|         0|       0|   0|   0|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   0|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
only showing top 2 rows



In [25]:
from pyspark.sql.functions import concat_ws
test_2 = test_res_df.select("movie_id",concat_ws(' ', "P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF","P RD","P Ani","P Mus","P SciFi","P Mys","P RC").alias('predictions'))

In [26]:
test_df_try = test_2.select(col("movie_id").cast(IntegerType()),col("predictions"))
test_df_try.show(2)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
|12378414|1 1 1 0 0 1 0 0 0...|
|13005847|0 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 2 rows



In [None]:
test_df_try.repartition(1).write.csv("./submission/part1.csv",header=True)

# Part 2: Using TF-IDF

In [23]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=100000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline_2 = Pipeline(stages=[hashingTF, idf])
pipelineFit_2 = pipeline_2.fit(dataset)
dataset_2 = pipelineFit_2.transform(test_dataset)


In [24]:
dataset_2.show(2)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               words|            filtered|         rawFeatures|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|(100000,[604,999,...|(100000,[604,999,...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|(100000,[24073,24...|(100000,[24073,24...|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [25]:
idf_test_data = pipelineFit_2.transform(test_dataset)
idf_test_data.show(2)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               words|            filtered|         rawFeatures|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|(100000,[604,999,...|(100000,[604,999,...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|(100000,[24073,24...|(100000,[24073,24...|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [26]:
drama_model = LogisticRegressionModel.load("./part2_lr_models/drama/")

In [27]:
pred_test_drama = drama_model.transform(idf_test_data)
final_pred_test_drama = pred_test_drama.select(col('movie_id').alias('m_id'),col('prediction').alias('P drama'))
test_df = (cv_test_data.join(final_pred_test_drama,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama")
test_df_1.show(2)

+--------+-------+
|movie_id|P drama|
+--------+-------+
|12378414|    0.0|
|13005847|    1.0|
+--------+-------+
only showing top 2 rows



In [28]:
comedy_model = LogisticRegressionModel.load("./part2_lr_models/comedy/")

pred_test_comedy = comedy_model.transform(idf_test_data)
final_pred_test_comedy = pred_test_comedy.select(col('movie_id').alias('m_id'),col('prediction').alias('P comedy'))
test_df = (test_df_1.join(final_pred_test_comedy,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P comedy")
test_df_1.show(2)

+--------+-------+--------+
|movie_id|P drama|P comedy|
+--------+-------+--------+
|12378414|    0.0|     1.0|
|13005847|    1.0|     0.0|
+--------+-------+--------+
only showing top 2 rows



In [29]:
romance_film_model = LogisticRegressionModel.load("./part2_lr_models/romance/")

pred_test_romance = romance_film_model.transform(idf_test_data)
final_pred_test_romance = pred_test_romance.select(col('movie_id').alias('m_id'),col('prediction').alias('P RF'))
test_df = (test_df_1.join(final_pred_test_romance,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF")

thriller_model = LogisticRegressionModel.load("./part2_lr_models/thriller/")

pred_test_thriller = thriller_model.transform(idf_test_data)
final_pred_test_thriller = pred_test_thriller.select(col('movie_id').alias('m_id'),col('prediction').alias('P Thriller'))
test_df = (test_df_1.join(final_pred_test_thriller,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller")

action_model = LogisticRegressionModel.load("./part2_lr_models/action/")

pred_test_action = action_model.transform(idf_test_data)
final_pred_test_action = pred_test_action.select(col('movie_id').alias('m_id'),col('prediction').alias('P Action'))
test_df = (test_df_1.join(final_pred_test_action,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action")

wc_model = LogisticRegressionModel.load("./part2_lr_models/world_cinema/")

pred_test_wc = wc_model.transform(idf_test_data)
final_pred_test_wc = pred_test_wc.select(col('movie_id').alias('m_id'),col('prediction').alias('P WC'))
test_df = (test_df_1.join(final_pred_test_wc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC")

cf_model = LogisticRegressionModel.load("./part2_lr_models/crime_fiction/")


pred_test_cf = cf_model.transform(idf_test_data)
final_pred_test_cf = pred_test_cf.select(col('movie_id').alias('m_id'),col('prediction').alias('P CF'))
test_df = (test_df_1.join(final_pred_test_cf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF")

horror_model = LogisticRegressionModel.load("./part2_lr_models/horror/")


pred_test_horror = horror_model.transform(idf_test_data)
final_pred_test_horror = pred_test_horror.select(col('movie_id').alias('m_id'),col('prediction').alias('P Horror'))
test_df = (test_df_1.join(final_pred_test_horror,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror")

bw_model = LogisticRegressionModel.load("./part2_lr_models/black_white/")


pred_test_bw = bw_model.transform(idf_test_data)
final_pred_test_bw = pred_test_bw.select(col('movie_id').alias('m_id'),col('prediction').alias('P BW'))
test_df = (test_df_1.join(final_pred_test_bw,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW")

indie_model = LogisticRegressionModel.load("./part2_lr_models/indie/")


pred_test_indie = indie_model.transform(idf_test_data)
final_pred_test_indie = pred_test_indie.select(col('movie_id').alias('m_id'),col('prediction').alias('P Indie'))
test_df = (test_df_1.join(final_pred_test_indie,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie")

aa_model = LogisticRegressionModel.load("./part2_lr_models/action_adventure/")


pred_test_aa = aa_model.transform(idf_test_data)
final_pred_test_aa = pred_test_aa.select(col('movie_id').alias('m_id'),col('prediction').alias('P AA'))
test_df = (test_df_1.join(final_pred_test_aa,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA")

a_model = LogisticRegressionModel.load("./part2_lr_models/adventure/")


pred_test_adv = a_model.transform(idf_test_data)
final_pred_test_adv = pred_test_adv.select(col('movie_id').alias('m_id'),col('prediction').alias('P Adv'))
test_df = (test_df_1.join(final_pred_test_adv,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv")

ff_model = LogisticRegressionModel.load("./part2_lr_models/family/")


pred_test_ff = ff_model.transform(idf_test_data)
final_pred_test_ff = pred_test_ff.select(col('movie_id').alias('m_id'),col('prediction').alias('P FF'))
test_df = (test_df_1.join(final_pred_test_ff,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF")

sf_model = LogisticRegressionModel.load("./part2_lr_models/short_film/")


pred_test_sf = sf_model.transform(idf_test_data)
final_pred_test_sf = pred_test_sf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SF'))
test_df = (test_df_1.join(final_pred_test_sf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF")


rd_model = LogisticRegressionModel.load("./part2_lr_models/rom_drama/")

pred_test_rd = rd_model.transform(idf_test_data)
final_pred_test_rd = pred_test_rd.select(col('movie_id').alias('m_id'),col('prediction').alias('P RD'))
test_df = (test_df_1.join(final_pred_test_rd,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD")


ani_model = LogisticRegressionModel.load("./part2_lr_models/animation/")

pred_test_ani = ani_model.transform(idf_test_data)
final_pred_test_ani = pred_test_ani.select(col('movie_id').alias('m_id'),col('prediction').alias('P Ani'))
test_df = (test_df_1.join(final_pred_test_ani,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani")


mus_model = LogisticRegressionModel.load("./part2_lr_models/musical/")

pred_test_mus = mus_model.transform(idf_test_data)
final_pred_test_mus = pred_test_mus.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mus'))
test_df = (test_df_1.join(final_pred_test_mus,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus")


scf_model = LogisticRegressionModel.load("./part2_lr_models/science_fiction/")

pred_test_scf = scf_model.transform(idf_test_data)
final_pred_test_scf = pred_test_scf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SciFi'))
test_df = (test_df_1.join(final_pred_test_scf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi")


m_model = LogisticRegressionModel.load("./part2_lr_models/mystery/")

pred_test_mys = m_model.transform(idf_test_data)
final_pred_test_mys = pred_test_mys.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mys'))
test_df = (test_df_1.join(final_pred_test_mys,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys")


rc_model = LogisticRegressionModel.load("./part2_lr_models/rom_com/")

pred_test_rc = rc_model.transform(idf_test_data)
final_pred_test_rc = pred_test_rc.select(col('movie_id').alias('m_id'),col('prediction').alias('P RC'))
test_df = (test_df_1.join(final_pred_test_rc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys", "P RC")

In [30]:
test_df_1.show(5)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|    0.0|     1.0| 0.0|       0.0|     0.0| 0.0| 0.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 1.0|
|13005847|    1.0|     0.0| 0.0|       1.0|     0.0| 0.0| 1.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13873910|    1.0|     0.0| 1.0|       0.0|     0.0| 0.0| 0.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13897336|    0.0|     0.0| 0.0|       1.0|     0.0| 0.0| 0.0|     0.0| 0.0|    0.0| 1.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|14355

In [36]:
test_res_df = test_df_1.select([col(c).cast(IntegerType()).cast("string") for c in test_df_1.columns])
test_res_df.show(2)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|      0|       1|   0|         0|       0|   0|   0|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   1|
|13005847|      1|       0|   0|         1|       0|   0|   1|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   0|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
only showing top 2 rows



In [37]:
test_2 = test_res_df.select("movie_id",concat_ws(' ', "P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF","P RD","P Ani","P Mus","P SciFi","P Mys","P RC").alias('predictions'))

In [38]:
test_df_try = test_2.select(col("movie_id").cast(IntegerType()),col("predictions"))
test_df_try.show(2)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
|12378414|0 1 0 0 0 0 0 0 0...|
|13005847|1 0 0 1 0 0 1 0 0...|
+--------+--------------------+
only showing top 2 rows



In [None]:
test_df_try.repartition(1).write.csv("./submission/part2.csv",header=True)

# Part 3: Custom Feature Engineering

In [31]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=50000)
idf = IDF(inputCol="rawFeatures", outputCol="idf_features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline_idf = Pipeline(stages=[hashingTF, idf])
pipelineFit_idf = pipeline_idf.fit(dataset)
idf_dataset = pipelineFit_idf.transform(dataset)

In [33]:
from pyspark.ml.feature import Word2Vec, VectorAssembler
word2vec = Word2Vec(vectorSize=50, seed=42, minCount=5, maxSentenceLength=100, \
                    maxIter = 1, numPartitions = 10, windowSize = 5, \
                    inputCol="filtered", outputCol="w2v_features")
vecAssembler = VectorAssembler(inputCols=["idf_features", "w2v_features"], outputCol="features")
word2vec_pipeline = Pipeline(stages=[word2vec,vecAssembler])

In [34]:
word2vec_model = word2vec_pipeline.fit(idf_dataset)
word2vec_df = word2vec_model.transform(idf_dataset)

In [35]:
word2vec_df.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+-----------+---------+----------+--------------+---------+-------+---------------+---------------+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|                  c1|                  c2|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Family Film|Adventure|Short Film|Romantic drama|Animation|Musical|Science Fiction|Romantic comedy|Mystery|               words|            filtered|         rawFeatures|        idf_features|        w2v_features|            features|
+--------+------------------+--------------------+--------------------+-------

In [36]:
pipelineFit_idf_test = pipeline_idf.fit(test_dataset)
idf_test_dataset = pipelineFit_idf_test.transform(test_dataset)
w2v_test_model = word2vec_pipeline.fit(idf_test_dataset)
w2v_test_csv_dataset = w2v_test_model.transform(idf_test_dataset)

In [37]:
test_csv_dataset = w2v_test_csv_dataset.select("movie_id","filtered","features")
test_csv_dataset.show(5)

+--------+--------------------+--------------------+
|movie_id|            filtered|            features|
+--------+--------------------+--------------------+
| 1335380|[film, based, eve...|(50050,[604,918,1...|
|29062594|[group, teenagers...|(50050,[1982,2833...|
| 9252321|[story, zulu, fam...|(50050,[177,230,4...|
|13455076|[stooges, play, t...|(50050,[324,1429,...|
|24165951|[soldier, fortune...|(50050,[3510,5159...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [45]:
drama_model = LogisticRegressionModel.load("./part3_lr_models/drama/")

In [46]:
pred_test_drama = drama_model.transform(test_csv_dataset)
final_pred_test_drama = pred_test_drama.select(col('movie_id').alias('m_id'),col('prediction').alias('P drama'))
test_df = (test_csv_dataset.join(final_pred_test_drama,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama")
test_df_1.show(2)

+--------+-------+
|movie_id|P drama|
+--------+-------+
|12378414|    0.0|
|13005847|    0.0|
+--------+-------+
only showing top 2 rows



In [47]:
comedy_model = LogisticRegressionModel.load("./part3_lr_models/comedy/")

pred_test_comedy = comedy_model.transform(test_csv_dataset)
final_pred_test_comedy = pred_test_comedy.select(col('movie_id').alias('m_id'),col('prediction').alias('P comedy'))
test_df = (test_df_1.join(final_pred_test_comedy,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P comedy")
test_df_1.show(2)

+--------+-------+--------+
|movie_id|P drama|P comedy|
+--------+-------+--------+
|12378414|    0.0|     1.0|
|13005847|    0.0|     0.0|
+--------+-------+--------+
only showing top 2 rows



In [48]:
romance_film_model = LogisticRegressionModel.load("./part3_lr_models/romance/")

pred_test_romance = romance_film_model.transform(test_csv_dataset)
final_pred_test_romance = pred_test_romance.select(col('movie_id').alias('m_id'),col('prediction').alias('P RF'))
test_df = (test_df_1.join(final_pred_test_romance,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF")

thriller_model = LogisticRegressionModel.load("./part3_lr_models/thriller/")

pred_test_thriller = thriller_model.transform(test_csv_dataset)
final_pred_test_thriller = pred_test_thriller.select(col('movie_id').alias('m_id'),col('prediction').alias('P Thriller'))
test_df = (test_df_1.join(final_pred_test_thriller,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller")

action_model = LogisticRegressionModel.load("./part3_lr_models/action/")

pred_test_action = action_model.transform(test_csv_dataset)
final_pred_test_action = pred_test_action.select(col('movie_id').alias('m_id'),col('prediction').alias('P Action'))
test_df = (test_df_1.join(final_pred_test_action,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action")

wc_model = LogisticRegressionModel.load("./part3_lr_models/world_cinema/")

pred_test_wc = wc_model.transform(test_csv_dataset)
final_pred_test_wc = pred_test_wc.select(col('movie_id').alias('m_id'),col('prediction').alias('P WC'))
test_df = (test_df_1.join(final_pred_test_wc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC")

cf_model = LogisticRegressionModel.load("./part3_lr_models/crime_fiction/")


pred_test_cf = cf_model.transform(test_csv_dataset)
final_pred_test_cf = pred_test_cf.select(col('movie_id').alias('m_id'),col('prediction').alias('P CF'))
test_df = (test_df_1.join(final_pred_test_cf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF")

horror_model = LogisticRegressionModel.load("./part3_lr_models/horror/")


pred_test_horror = horror_model.transform(test_csv_dataset)
final_pred_test_horror = pred_test_horror.select(col('movie_id').alias('m_id'),col('prediction').alias('P Horror'))
test_df = (test_df_1.join(final_pred_test_horror,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror")

bw_model = LogisticRegressionModel.load("./part3_lr_models/black_white/")


pred_test_bw = bw_model.transform(test_csv_dataset)
final_pred_test_bw = pred_test_bw.select(col('movie_id').alias('m_id'),col('prediction').alias('P BW'))
test_df = (test_df_1.join(final_pred_test_bw,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW")

indie_model = LogisticRegressionModel.load("./part3_lr_models/indie/")


pred_test_indie = indie_model.transform(test_csv_dataset)
final_pred_test_indie = pred_test_indie.select(col('movie_id').alias('m_id'),col('prediction').alias('P Indie'))
test_df = (test_df_1.join(final_pred_test_indie,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie")

aa_model = LogisticRegressionModel.load("./part3_lr_models/action_adventure/")


pred_test_aa = aa_model.transform(test_csv_dataset)
final_pred_test_aa = pred_test_aa.select(col('movie_id').alias('m_id'),col('prediction').alias('P AA'))
test_df = (test_df_1.join(final_pred_test_aa,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA")

a_model = LogisticRegressionModel.load("./part3_lr_models/adventure/")


pred_test_adv = a_model.transform(test_csv_dataset)
final_pred_test_adv = pred_test_adv.select(col('movie_id').alias('m_id'),col('prediction').alias('P Adv'))
test_df = (test_df_1.join(final_pred_test_adv,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv")

ff_model = LogisticRegressionModel.load("./part3_lr_models/family/")


pred_test_ff = ff_model.transform(test_csv_dataset)
final_pred_test_ff = pred_test_ff.select(col('movie_id').alias('m_id'),col('prediction').alias('P FF'))
test_df = (test_df_1.join(final_pred_test_ff,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF")

sf_model = LogisticRegressionModel.load("./part3_lr_models/short_film/")


pred_test_sf = sf_model.transform(test_csv_dataset)
final_pred_test_sf = pred_test_sf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SF'))
test_df = (test_df_1.join(final_pred_test_sf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF")


rd_model = LogisticRegressionModel.load("./part3_lr_models/rom_drama/")

pred_test_rd = rd_model.transform(test_csv_dataset)
final_pred_test_rd = pred_test_rd.select(col('movie_id').alias('m_id'),col('prediction').alias('P RD'))
test_df = (test_df_1.join(final_pred_test_rd,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD")


ani_model = LogisticRegressionModel.load("./part3_lr_models/animation/")

pred_test_ani = ani_model.transform(test_csv_dataset)
final_pred_test_ani = pred_test_ani.select(col('movie_id').alias('m_id'),col('prediction').alias('P Ani'))
test_df = (test_df_1.join(final_pred_test_ani,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani")


mus_model = LogisticRegressionModel.load("./part3_lr_models/musical/")

pred_test_mus = mus_model.transform(test_csv_dataset)
final_pred_test_mus = pred_test_mus.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mus'))
test_df = (test_df_1.join(final_pred_test_mus,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus")


scf_model = LogisticRegressionModel.load("./part3_lr_models/science_fiction/")

pred_test_scf = scf_model.transform(test_csv_dataset)
final_pred_test_scf = pred_test_scf.select(col('movie_id').alias('m_id'),col('prediction').alias('P SciFi'))
test_df = (test_df_1.join(final_pred_test_scf,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi")


m_model = LogisticRegressionModel.load("./part3_lr_models/mystery/")

pred_test_mys = m_model.transform(test_csv_dataset)
final_pred_test_mys = pred_test_mys.select(col('movie_id').alias('m_id'),col('prediction').alias('P Mys'))
test_df = (test_df_1.join(final_pred_test_mys,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys")


rc_model = LogisticRegressionModel.load("./part3_lr_models/rom_com/")

pred_test_rc = rc_model.transform(test_csv_dataset)
final_pred_test_rc = pred_test_rc.select(col('movie_id').alias('m_id'),col('prediction').alias('P RC'))
test_df = (test_df_1.join(final_pred_test_rc,col("movie_id") == col("m_id"),"leftouter"))

test_df_1 = test_df.select("movie_id","P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF", "P RD", "P Ani", "P Mus", "P SciFi", "P Mys", "P RC")

In [49]:
test_df_1.show(5)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|    0.0|     1.0| 1.0|       0.0|     0.0| 0.0| 0.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 1.0|
|13005847|    0.0|     0.0| 1.0|       1.0|     0.0| 0.0| 1.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13873910|    0.0|     0.0| 1.0|       0.0|     1.0| 0.0| 0.0|     0.0| 0.0|    0.0| 0.0|  0.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|13897336|    1.0|     0.0| 0.0|       1.0|     0.0| 1.0| 0.0|     0.0| 0.0|    0.0| 1.0|  1.0| 0.0| 0.0| 0.0|  0.0|  0.0|    0.0|  0.0| 0.0|
|14355

In [50]:
test_res_df = test_df_1.select([col(c).cast(IntegerType()).cast("string") for c in test_df_1.columns])
test_res_df.show(2)

+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|movie_id|P drama|P Comedy|P RF|P Thriller|P Action|P WC|P CF|P Horror|P BW|P Indie|P AA|P Adv|P FF|P SF|P RD|P Ani|P Mus|P SciFi|P Mys|P RC|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
|12378414|      0|       1|   1|         0|       0|   0|   0|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   1|
|13005847|      0|       0|   1|         1|       0|   0|   1|       0|   0|      0|   0|    0|   0|   0|   0|    0|    0|      0|    0|   0|
+--------+-------+--------+----+----------+--------+----+----+--------+----+-------+----+-----+----+----+----+-----+-----+-------+-----+----+
only showing top 2 rows



In [51]:
test_2 = test_res_df.select("movie_id",concat_ws(' ', "P drama","P Comedy","P RF","P Thriller","P Action","P WC","P CF","P Horror","P BW","P Indie","P AA","P Adv","P FF","P SF","P RD","P Ani","P Mus","P SciFi","P Mys","P RC").alias('predictions'))

In [52]:
test_df_try = test_2.select(col("movie_id").cast(IntegerType()),col("predictions"))
test_df_try.show(2)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
|12378414|0 1 1 0 0 0 0 0 0...|
|13005847|0 0 1 1 0 0 1 0 0...|
+--------+--------------------+
only showing top 2 rows



In [None]:
test_df_try.repartition(1).write.csv("./submission/part3.csv",header=True)