In [2]:
# Import libraries
import pandas as pd
import numpy as np
import os
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn import metrics
from flask import Flask, request, jsonify
import joblib
import traceback
import json

from dvc.api import make_checkpoint

# Display full outputs
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import warnings
warnings.filterwarnings("ignore")

In [3]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/project/spark-3.2.1-bin-hadoop3.2"

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark App") \
    .config("spark.jars", "postgresql-42.3.2.jar") \
    .getOrCreate()

### Machine Learning

In [48]:
# Build a machine learning to predict success of worldwide box office income
# New data frame for machine learning model
df_ml = df_film

# Keep only needed columns
df_ml = df_ml[['title', 'year', 'distributor', 'genre', 'rating', 'duration', 'worldwide_k']]
make_checkpoint()

In [49]:
# Create dummy varaibles for distributor
# Only identify if the company is one of the top 10 film distributors
# Top 10 are: Warner Bros., Walt Disney Studios Motion Pictures, United Artists, Paramount Pictures, Universal Pictures, Columbia Pictures, Twentieth Century Fox, Miramax, Metro-Goldwyn-Mayer (MGM) and Sony Pictures Classics
df_ml['dum_distributor'] = 0
Top_10 = ['Warner Bros.', 'Walt Disney Studios Motion Pictures', 'United Artists', 'Paramount Pictures', 'Universal Pictures', 'Columbia Pictures', 'Twentieth Century Fox', 'Miramax', 'Metro-Goldwyn-Mayer (MGM)', 'Sony Pictures Classics']
for i in df_ml.index:
    if df_ml.at[i,'distributor'] in Top_10:
        df_ml.at[i,'dum_distributor'] = 1
    else:
        df_ml.at[i,'dum_distributor'] = 0
make_checkpoint()

In [50]:
# Create dummy varaibles for genre
# The least 5 seen genres are identified as other genres
df_ml['dum_drama'] = np.where(df_ml['genre'] == 'Drama', 1, 0)
df_ml['dum_action'] = np.where(df_ml['genre'] == 'Action', 1, 0)
df_ml['dum_adventure'] = np.where(df_ml['genre'] == 'Adventure', 1, 0)
df_ml['dum_crime'] = np.where(df_ml['genre'] == 'Crime', 1, 0)
df_ml['dum_comedy'] = np.where(df_ml['genre'] == 'Comedy', 1, 0)
df_ml['dum_biography'] = np.where(df_ml['genre'] == 'Biography', 1, 0)

list_othergenre = ['Animation', 'Horror', 'Mystery', 'Western', 'Film-Noir']
df_ml['dum_othergenre'] = 0
for i in df_ml.index:
    if df_ml.at[i,'genre'] in list_othergenre:
        df_ml.at[i,'dum_othergenre'] = 1
    else:
        df_ml.at[i,'dum_othergenre'] = 0
make_checkpoint()

In [51]:
# Create the y variable (whether the film succeed in box office income)
# success = 1 if worldwide income > median of all 250 films, = 0 otherwise
median_income = df_ml['worldwide_k'].median()

df_ml['success'] = 0
for i in df_ml.index:
    if df_ml.at[i,'worldwide_k'] > median_income:
        df_ml.at[i,'success'] = 1
    else:
        df_ml.at[i,'success'] = 0
make_checkpoint()

In [None]:
# Drop origianl distributor and genre columns
df_ml = df_ml.drop(['distributor', 'genre', 'worldwide_k'],axis =1)
make_checkpoint()

# Adjust the order of the columns
df_ml = df_ml[['title', 'year', 'rating', 'duration', 'dum_distributor', 'dum_drama', 'dum_action', 'dum_adventure', 'dum_crime', 'dum_comedy', 'dum_biography', 'dum_othergenre', 'success']]
make_checkpoint()

In [52]:
# Final version of machine learning data frame
df_ml

Unnamed: 0,title,year,rating,duration,dum_distributor,dum_drama,dum_action,dum_adventure,dum_crime,dum_comedy,dum_biography,dum_othergenre,success
0,Harry Potter and the Deathly Hallows: Part 2,2011,8.0748,130,1,0,0,1,0,0,0,0,1
1,Mr Smith Goes to Washington,1939,8.0725,129,1,0,0,0,0,1,0,0,0
2,The Grand Budapest Hotel,2014,8.0716,99,0,0,0,1,0,0,0,0,1
3,Sherlock Jr,1924,8.0704,45,0,0,1,0,0,0,0,0,0
4,Le salaire de la peur,1953,8.0702,131,0,0,0,1,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
245,The Lives of Others,2006,8.3869,137,1,1,0,0,0,0,0,0,1
246,Sunset Blvd,1950,8.3838,110,1,1,0,0,0,0,0,0,0
247,Paths of Glory,1957,8.3712,88,1,1,0,0,0,0,0,0,0
248,The Shining,1980,8.3683,146,1,1,0,0,0,0,0,0,0


In [53]:
# Convert the data frame into spark data frame
df_ml_spark = spark.createDataFrame(df_ml)

In [54]:
df_ml_spark.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- duration: long (nullable = true)
 |-- dum_distributor: long (nullable = true)
 |-- dum_drama: long (nullable = true)
 |-- dum_action: long (nullable = true)
 |-- dum_adventure: long (nullable = true)
 |-- dum_crime: long (nullable = true)
 |-- dum_comedy: long (nullable = true)
 |-- dum_biography: long (nullable = true)
 |-- dum_othergenre: long (nullable = true)
 |-- success: long (nullable = true)



In [55]:
# Convert the data frame into parquet format
df_ml_spark.write.parquet("/project/Individual/parquet_files/ml.parquet", mode = 'overwrite')
make_checkpoint()

In [56]:
# Train a logistic regression model for prediction

# Split variables and train/test sets
X = df_ml.drop(['title', 'success'], axis = 1)
y = df_ml['success']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

mlr = LogisticRegression()
mlr.fit(X_train, y_train)

#Predict the response for test dataset
y_pred_train = mlr.predict(X_train)
y_pred_test = mlr.predict(X_test)

# Model Accuracy
acc_train = metrics.accuracy_score(y_train, y_pred_train)
acc_test = metrics.accuracy_score(y_test, y_pred_test)
print(f"Accuracy in train set: {acc_train:.3f}")
print(f"Accuracy in test set: {acc_test:.3f}")

LogisticRegression()

Accuracy in train set: 0.670
Accuracy in test set: 0.660


In [None]:
#Saving the model
joblib.dump(mlr, 'model.pkl')
lr = joblib.load('model.pkl')

model_columns = list(X.columns)
joblib.dump(model_columns, 'model_columns.pkl')

In [None]:
# API definition using Flask
app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    if lr:
        try:
            json_ = request.json
            print(json_)
            query = pd.get_dummies(pd.DataFrame(json_))
            query = query.reindex(columns=model_columns, fill_value=0)

            prediction = list(lr.predict(query))

            return jsonify({'prediction': str(prediction)})

        except:

            return jsonify({'trace': traceback.format_exc()})
    else:
        print ('Train the model first')
        return ('No model here to use')

if __name__ == '__main__':
    try:
        port = int(sys.argv[1]) # This is for a command-line input
    except:
        port = 12345

    lr = joblib.load("model.pkl")
    model_columns = joblib.load("model_columns.pkl")


    app.run(debug=True, use_reloader=False)