In [1]:
import datetime
start = datetime.datetime.now()
print(start.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:01:46


In [2]:
#Libraries
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

import re
import os
import pandas as pd

import pyspark
from pyspark import SQLContext
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, Word2Vec, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace, when, lit, concat_ws
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

In [3]:
sc =  pyspark.SparkContext()

memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

sqlContext = SQLContext(sc)
spark = SparkSession \
    .builder \
    .appName("Pyspark demo") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
#Reading data
d1 = pd.read_csv('/home/cse587/dic487-587/train.csv')
train = sqlContext.createDataFrame(d1)

d2 = pd.read_csv('/home/cse587/dic487-587/test.csv')
test = sqlContext.createDataFrame(d2)

In [5]:
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:02:25


In [6]:
#Preprocessing
clean_plot = train.select('movie_id', 'movie_name', (lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')), 'genre')
clean_plot_test = test.select('movie_id', 'movie_name', (lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))

tokenizer = Tokenizer(inputCol = 'plot', outputCol = 'plot_words')
words_plot = tokenizer.transform(clean_plot).select('movie_id', 'movie_name', 'plot_words', 'genre')
words_plot_test = tokenizer.transform(clean_plot_test).select('movie_id', 'movie_name', 'plot_words')

stop_words_remover = StopWordsRemover(inputCol = 'plot_words', outputCol = 'plot_clean')
clean_train = stop_words_remover.transform(words_plot).select('movie_id', 'movie_name', 'plot_clean', 'genre')
clean_test = stop_words_remover.transform(words_plot_test).select('movie_id', 'movie_name', 'plot_clean')

filter_len = udf(lambda row: [x for x in row if len(x) > 2], ArrayType(StringType()))
final_train = clean_train.withColumn('plots', filter_len(col('plot_clean'))).select('movie_id', 'movie_name', 'plots', 'genre')
final_test = clean_test.withColumn('plots', filter_len(col('plot_clean'))).select('movie_id', 'movie_name', 'plots')

cv = CountVectorizer(inputCol = "plots", outputCol = "feature_vector", vocabSize = 20000, minDF = 2)
model = cv.fit(final_train)
train_data = model.transform(final_train)
model = cv.fit(final_test)
test_data = model.transform(final_test)

train_data = train_data.select('feature_vector', 'genre')
test_data = test_data.select('feature_vector')

movie_id = test.select("movie_id").toPandas()

In [7]:
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:03:16


In [8]:
def get_predictions(train, test, key):
    nnr = train.withColumn('label', when(train.genre.contains(key),1).otherwise(0))
    data = nnr.select('features','label')
    lr = LogisticRegression(maxIter = 100)
    lrModel = lr.fit(data)
    res = lrModel.transform(test)
    pred = res.select("prediction")
    pred = pred.withColumnRenamed("prediction",key).toPandas()
    return pred

Part - 1

In [9]:
def part1(data, test):    
    c00 = get_predictions(data, test, "Drama")
    c01 = get_predictions(data, test, "Comedy")
    c02 = get_predictions(data, test, "Romance Film")
    c03 = get_predictions(data, test, "Thriller")
    c04 = get_predictions(data, test, "Action")
    c05 = get_predictions(data, test, "World cinema")
    c06 = get_predictions(data, test, "Crime Fiction")
    c07 = get_predictions(data, test, "Horror")
    c08 = get_predictions(data, test, "Black-and-white")
    c09 = get_predictions(data, test, "Indie")
    c10 = get_predictions(data, test, "Action/Adventure")
    c11 = get_predictions(data, test, "Adventure")
    c12 = get_predictions(data, test, "Family Film")
    c13 = get_predictions(data, test, "Short Film")
    c14 = get_predictions(data, test, "Romantic Drama")
    c15 = get_predictions(data, test, "Animation")
    c16 = get_predictions(data, test, "Musical")
    c17 = get_predictions(data, test, "Science Fiction")
    c18 = get_predictions(data, test, "Mystery")
    c19 = get_predictions(data, test, "Romantic comedy")
    
    final_res = pd.concat([c00, c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19], axis = 1)
    final_res.columns = final_res.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('-', '_').str.replace('/', '_')
    final_res = final_res.astype(int)
    
    p = sqlContext.createDataFrame(final_res)
    predictions = p.select(concat_ws(' ',p.drama, p.comedy, p.romance_film, p.thriller, p.action, p.world_cinema, p.crime_fiction, p.horror, p.black_and_white, p.indie, p.action_adventure, p.family_film, p.short_film, p.romantic_drama, p.animation, p.musical, p.science_fiction, p.mystery, p.romantic_comedy).alias("prediction"))
    predictions = predictions.withColumnRenamed('prediction','predictions')
    predictions = predictions.toPandas()
    predictions = pd.concat([movie_id, predictions], axis = 1)
    return predictions

train1 = train_data.select('feature_vector', 'genre')
train1 = train1.withColumnRenamed('feature_vector','features')
test1 = test_data.select('feature_vector')
test1 = test1.withColumnRenamed('feature_vector','features')
final_pred1 = part1(train1, test1)
labels1 = sqlContext.createDataFrame(final_pred1)
labels1.write.csv('Part1', header=True)

In [10]:
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:11:28


Part - 2

In [11]:
def idf_func(df):
    idf = IDF(inputCol="feature_vector", outputCol="features")
    idfModel = idf.fit(df)
    rescaledData = idfModel.transform(df)
    return rescaledData

def part2(data, test):
    rescaled_train = idf_func(data)
    rescaled_test = idf_func(test)
    prediction = part1(rescaled_train, rescaled_test)
    return prediction

final_pred2 = part2(train_data, test_data)
labels2 = sqlContext.createDataFrame(final_pred2)
labels2.write.csv('Part2', header=True)

In [12]:
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:18:38


Part - 3

In [13]:
def part3(data, test):
    hashingTF = HashingTF(inputCol="plots", outputCol="feature_vector", numFeatures=10000)
    featurizedData = hashingTF.transform(data)
    featurizedTest = hashingTF.transform(test)
    rescaled_train = idf_func(featurizedData)
    rescaled_test = idf_func(featurizedTest)
    prediction = part1(rescaled_train, rescaled_test)
    return prediction

train3 = final_train.select('plots', 'genre')
test3 = final_test.select('plots')
final_pred3 = part3(train3, test3)
labels3 = sqlContext.createDataFrame(final_pred3)
labels3.write.csv('Part3', header=True)

In [14]:
end = datetime.datetime.now()
print(end.strftime("%Y-%m-%d %H:%M:%S"))

2020-05-16 20:26:52
