In [1]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
import pyspark
sc = pyspark.SparkContext()
from pyspark.sql import *
spark = SparkSession \
    .builder \
    .appName("Movie review") \
    .config("spark.some.config.option","some-value") \
    .getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
df = spark.read.load("/home/cse587/train.csv", format='csv', inferSchema="true",header="true", escape = '"', seperator = ",")
label_df = spark.read.load("/home/cse587/mapping.csv", format='csv', inferSchema="true",header="true", escape = '"', seperator = ",")
df_test = spark.read.load("/home/cse587/test.csv", format='csv', inferSchema="true",header="true", escape = '"', seperator = ",")

In [0]:
#df = df.limit(10)
#df_test = df_test.limit(10)`

In [3]:
labels_list = []
for row in label_df.collect():
    labels_list.append(row[1])

In [4]:
# Relate movie_id and movie_name
movie_name = {}
df = df.dropna(subset=['plot', 'genre'])
for row in df.collect():
    if row['movie_id'] != None:
        movie_name[row['movie_id']] = row['movie_name']
        
# Relate movie_id and movie_name
movie_name_test = {}
df_test = df_test.dropna(subset=['plot'])
for row in df_test.collect():
    if row['movie_id'] != None:
        movie_name_test[row['movie_id']] = row['movie_name']

In [5]:
from collections import defaultdict
from pyspark.sql.functions import col, lower, regexp_replace
from pyspark.ml.feature import RegexTokenizer,Tokenizer, StopWordsRemover
from pyspark.ml import Pipeline

df = df.select('movie_id', 'movie_name', (lower(regexp_replace('plot',"[^a-zA-Z\\s]","")).alias('plot')), 'genre')
df_test = df_test.select('movie_id', 'movie_name', (lower(regexp_replace('plot',"[^a-zA-Z\\s]","")).alias('plot')))
tokenizer = Tokenizer(inputCol='plot', outputCol='plot_token')
df = tokenizer.transform(df)
df_test = tokenizer.transform(df_test)                      
remover = StopWordsRemover(inputCol='plot_token', outputCol='plot_stop')
df=remover.transform(df)
df_test=remover.transform(df_test)

In [8]:
from pyspark.ml.feature import CountVectorizer
countVectors = CountVectorizer(inputCol="plot_stop", outputCol="features")
model = countVectors.fit(df)
X = model.transform(df)

df = model.transform(df)
df = df.select("movie_id", "movie_name", "plot_stop","features","genre")
df.show()

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|           plot_stop|            features|               genre|
+--------+--------------------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|[shlykov, hardwor...|(154610,[126,180,...|['World cinema', ...|
|31186339|    The Hunger Games|[nation, panem, c...|(154610,[0,3,5,8,...|['Action/Adventur...|
|20663735|          Narasimham|[poovalli, induch...|(154610,[0,2,6,10...|['Musical', 'Acti...|
| 2231378|  The Lemon Drop Kid|[lemon, drop, kid...|(154610,[0,8,9,13...|          ['Comedy']|
|  595909|   A Cry in the Dark|[seventhday, adve...|(154610,[0,3,9,10...|['Crime Fiction',...|
| 5272176|            End Game|[president, way, ...|(154610,[0,3,4,7,...|['Action/Adventur...|
| 1952976|          Dark Water|[plot, film, open...|(154610,[0,1,2,3,...|['Thriller', 'Dra...|
|24225279|                Sing|[story, begins, h..

In [9]:
df_test = model.transform(df_test)
df_test = df_test.select("movie_id", "movie_name","features")

In [11]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

pd_df = df.toPandas()

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


In [12]:
import ast
label_set = [[0]* len(labels_list) for i in range(len(movie_name))]

In [13]:
i = 0
for row in df.collect():
  if row['genre'] != None:
    elements = ast.literal_eval(row['genre'])
    for genre in elements:
      for label in range(len(labels_list)):
        if genre == labels_list[label]:
          label_set[i][label] = 1
  i = i + 1

label_set

[[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0],
 [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
 [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0],
 [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
 [0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [14]:
import numpy as np
cols = []
Y_train = np.array(label_set)
temp = Y_train.transpose()
l = labels_list
for i in range(len(temp)):
    pd_df[str(i+1)] = temp[i]
    cols.append(str(i+1))

In [15]:
df1 = spark.createDataFrame(pd_df)

  PyArrow >= 0.8.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.


In [16]:
df2 = df1
df3 = df2.select('movie_id','features', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20')
df3.show(truncate=False)

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [17]:
from pyspark.ml.feature import StandardScaler
standardscaler = StandardScaler().setInputCol("features").setOutputCol("scaled_features")
df3 = standardscaler.fit(df3).transform(df3)
df_test = standardscaler.fit(df_test).transform(df_test)
df3= df3.select('movie_id','scaled_features','1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20')

In [18]:
df3.show()
data = df3

+--------+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|movie_id|     scaled_features|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20|
+--------+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|23890098|(154610,[126,180,...|  1|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|31186339|(154610,[0,3,5,8,...|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|20663735|(154610,[0,2,6,10...|  1|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
| 2231378|(154610,[0,8,9,13...|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  595909|(154610,[0,3,9,10...|  1|  0|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 5272176|(154610,[0,3,4,7,...|  1|  0|  0|  1|  1|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0| 

In [19]:
import pyspark.sql.functions as f
from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel
from pyspark.sql.functions import when, lit, col
from pyspark.ml.feature import *
from pathlib import Path

In [20]:
for i in range(20):
    file = Path("/home/cse587/part1/"+str(i))
    if file.exists():
      print("Loading Model " , i+1)
      load_path = "part1/" + str(i)
      model = LogisticRegressionModel.load("/home/cse587/part1/"+str(i))
    else:
      print("Training Model ",i+1,"/ 20")
      lr = LogisticRegression(labelCol = str(20-i),featuresCol="scaled_features")
      model = lr.fit(data)
      path = "/home/cse586/part1/"+str(i)
      model.save(path)

    print("Testing Model ",i+1, "/ 20")
    predictions= model.transform(df_test)
    if (i == 0):
        pred1 = predictions.withColumnRenamed("prediction", (str(20-i)))
        pred1 = pred1.select("movie_id", str(20-i))
        df_p = pred1.withColumn(str(20-i), \
              when(pred1[str(20-i)] == 1, 20-i).otherwise(0))
        df_p.show()
    else:
        pred2 = predictions.withColumnRenamed("prediction", (str(20-i)))
        pred2 = pred2.select("movie_id", str(20-i))
        pred2 = pred2.withColumn(str(20-i),when(pred2[str(20-i)] == 1, 20-i).otherwise(0))
        df_p = pred2.join(df_p, "movie_id", "right")
        df_p.show()

Training Model  1 / 20
Testing Model  1 / 20
+--------+---+
|movie_id| 20|
+--------+---+
| 1335380|  0|
|29062594|  0|
| 9252321|  0|
|13455076|  0|
|24165951|  0|
| 1925869|  0|
|10799612|  0|
|28238240|  0|
|17124781|  0|
|28207941|  0|
|19174305|  0|
|18392317|  0|
|34420857|  0|
| 4039635|  0|
| 8034072|  0|
| 4016437|  0|
| 1520023|  0|
|24589422|  0|
|35068740|  0|
|21132951|  0|
+--------+---+
only showing top 20 rows

Training Model  2 / 20
Testing Model  2 / 20
+--------+---+---+
|movie_id| 19| 20|
+--------+---+---+
| 1335380|  0|  0|
|29062594|  0|  0|
| 9252321|  0|  0|
|13455076|  0|  0|
|24165951|  0|  0|
| 1925869|  0|  0|
|10799612|  0|  0|
|28238240|  0|  0|
|17124781|  0|  0|
|28207941|  0|  0|
|19174305|  0|  0|
|18392317| 19|  0|
|34420857|  0|  0|
| 4039635|  0|  0|
| 8034072|  0|  0|
| 4016437|  0|  0|
| 1520023|  0|  0|
|24589422|  0|  0|
|35068740|  0|  0|
|21132951|  0|  0|
+--------+---+---+
only showing top 20 rows

Training Model  3 / 20
Testing Model  3 / 

Testing Model  11 / 20
+--------+---+---+---+---+---+---+---+---+---+---+---+
|movie_id| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20|
+--------+---+---+---+---+---+---+---+---+---+---+---+
| 1335380|  0| 11|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|29062594|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 9252321|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|13455076|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|24165951|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 1925869|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|10799612|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|28238240|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|17124781|  0| 11|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|28207941|  0|  0|  0|  0| 14|  0| 16|  0|  0|  0|  0|
|19174305|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|18392317|  0|  0|  0|  0|  0|  0| 16|  0|  0| 19|  0|
|34420857|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 4039635|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 8034072|  0|  0|  0|  0|  0|  0|  0| 17|

Testing Model  17 / 20
+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|movie_id|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20|
+--------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 1335380|  0|  0|  0|  0|  0|  0|  0| 11|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|29062594|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 9252321|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|13455076|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|24165951|  0|  5|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
| 1925869|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|10799612|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|28238240|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|17124781|  0|  0|  0|  0|  0|  0|  0| 11|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|28207941|  0|  0|  0|  0|  0

In [25]:
final = {}
f3 = df_p.drop('movie_id')
to_list = [list(row) for row in f3.collect()]
y_pred = [[int(ele) for ele in sub if ele != 0] for sub in to_list]

In [26]:
import csv
with open('/home/cse587/mapping_part1.csv', mode='w') as csv_file:
    writer = csv.writer(csv_file)
    writer.writerow(['', '0'])
    labels = labels_list
    for i in range (len(labels)):
        writer.writerow((str(i),labels[i]))

In [27]:
l = list(movie_name_test.keys())
with open('/home/cse587/sample_part1.csv', mode='w') as csv_file:
    writer = csv.writer(csv_file)
    writer.writerow(['movie_id', 'predictions'])
    for i in range (len(y_pred)):
        a = ['0'] * 20
        for j in range(len(y_pred[i])):
            a[y_pred[i][j] - 1] = '1'
        temp = ' '.join(a)
        writer.writerow((l[i],temp))