In [39]:
%load_ext autoreload
%autoreload 2
!pip install plotly

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
[33mYou are using pip version 9.0.3, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [40]:
####-----------------Common Stuff-------------------####

In [41]:
# Common
# Import necessary libraries and packages

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
import pandas as pd
import matplotlib.mlab as mlab
import plotly.plotly as py
import plotly.figure_factory as ff
import plotly
from plotly.graph_objs import *
import plotly.graph_objs as go


plotly.tools.set_credentials_file(username='pbd', api_key='nrkkjdZy9uNG8DaoOM2l')


In [42]:
## Common
## Perform pre processing on data i.e. StringIndexing -> OneHotEncoding -> VectorAssembler 

categorical_columns= ['directorName', 'writeName', 'genre', 'runtime', 'year']

### sringIndexer

indexers = [
            StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
            for c in categorical_columns
]

### oneHotEncoder

encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
            for indexer in indexers
]

### vectorAssembler

assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [43]:
####-----------------Common Stuff-------------------####

In [44]:
####-----------------Start Training-------------------####

In [45]:
##Training
## Load training data with header

data = spark.read.options(header='true', inferschema='true', delimiter=',').csv("2000-2018.csv")  
display(data)
data.cache()
data.show(5)
data.printSchema()




DataFrame[title: string, year: int, avgRating: double, votes: int, directorName: string, writeName: string, runtime: int, genre: string]

+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
|        title|year|avgRating|votes|  directorName|        writeName|runtime|               genre|
+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|    Roman Coppola|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|   Kunichi Nomura|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|     Wes Anderson|    101|Adventure,Animati...|
| Isle of Dogs|2018|      8.2|18140|  Wes Anderson|Jason Schwartzman|    101|Adventure,Animati...|
|A Quiet Place|2018|      8.1|68050|John Krasinski|      Bryan Woods|     90| Drama,Horror,Sci-Fi|
+-------------+----+---------+-----+--------------+-----------------+-------+--------------------+
only showing top 5 rows

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- 

In [46]:
## Training

## Feed the training data to the pipeline i.e. VectorAssembler -> Pipeline

pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(data)
transformed = model.transform(data)

transformed.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- votes: integer (nullable = true)
 |-- directorName: string (nullable = true)
 |-- writeName: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- directorName_indexed: double (nullable = false)
 |-- writeName_indexed: double (nullable = false)
 |-- genre_indexed: double (nullable = false)
 |-- runtime_indexed: double (nullable = false)
 |-- year_indexed: double (nullable = false)
 |-- directorName_indexed_encoded: vector (nullable = true)
 |-- writeName_indexed_encoded: vector (nullable = true)
 |-- genre_indexed_encoded: vector (nullable = true)
 |-- runtime_indexed_encoded: vector (nullable = true)
 |-- year_indexed_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [47]:
## Training
## Select required columns from the resulting dataframe (training data)

final = transformed.select("features", "avgRating")
final.printSchema()
final.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)

+--------------------+---------+
|            features|avgRating|
+--------------------+---------+
|(9275,[140,3317,8...|      8.2|
|(9275,[140,6879,8...|      8.2|
|(9275,[140,2696,8...|      8.2|
|(9275,[140,4528,8...|      8.2|
|(9275,[1478,6743,...|      8.1|
|(9275,[1478,8394,...|      8.1|
|(9275,[1478,4899,...|      8.1|
|(9275,[930,7441,8...|      8.1|
|(9275,[930,8139,8...|      8.1|
|(9275,[930,7093,8...|      8.1|
|(9275,[2074,2588,...|      7.9|
|(9275,[16,4695,87...|      7.8|
|(9275,[16,2678,87...|      7.8|
|(9275,[522,2431,8...|      7.7|
|(9275,[522,2445,8...|      7.7|
|(9275,[522,7165,8...|      7.7|
|(9275,[522,3746,8...|      7.7|
|(9275,[954,3271,8...|      7.4|
|(9275,[1021,3271,...|      7.4|
|(9275,[1182,7189,...|      7.2|
+--------------------+---------+
only showing top 20 rows



In [48]:
##Training
# To train the model
## Randomly split the data set for testing and training

lr_train,lr_test = final.randomSplit([0.8, 0.2])

## Create a Linear Regression Model with required parameters

lr = LinearRegression(labelCol="avgRating",
                      fitIntercept=True, 
                      maxIter=100, 
                      regParam=0.02, 
                      elasticNetParam=0.02)

## fit the Model to the training dataset

lrModel = lr.fit(lr_train)


In [49]:
##Training
### Collect information about the trained model
## Training summary 

trainingSummary = lrModel.summary

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

print("Intercept: %s" % str(lrModel.intercept))
print("numIterations: %d" % trainingSummary.totalIterations)

print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print("Coefficients: %s" % str(lrModel.coefficients))

trainingSummary.predictions.printSchema()
trainingSummary.predictions.show()
training_residuals = trainingSummary.residuals


RMSE: 0.228065
r2: 0.955794
Intercept: 6.53366464426
numIterations: 101
objectiveHistory: [0.49995629752644277, 0.3551962012002494, 0.06891603086109885, 0.05918192460808857, 0.051862763426827434, 0.04912468095731352, 0.04713208700498115, 0.04564857971728275, 0.04482433724566848, 0.044302973464853075, 0.04397102887302282, 0.04370284320572424, 0.04352021196181697, 0.04335167636876333, 0.04324924803822489, 0.04316372041697888, 0.0430953242468799, 0.04303646535745501, 0.04299988248729416, 0.04297089112323546, 0.04293574373485173, 0.042920849482377024, 0.04289962643192102, 0.042886270318077534, 0.04287124830838509, 0.042859355536989725, 0.042850476632242854, 0.04284092475015401, 0.042835400234910316, 0.04282786631584737, 0.042822777726776624, 0.042818814104909154, 0.04281441853102516, 0.04281168892731439, 0.042810310765960535, 0.04280717118259937, 0.04280565837633607, 0.042804081980874126, 0.04280318678438556, 0.04280169501952854, 0.04280117078298156, 0.0428002405088399, 0.04279983346131715

+--------------------+---------+-----------------+
|            features|avgRating|       prediction|
+--------------------+---------+-----------------+
|(9275,[0,2821,877...|      6.5|6.424315933844177|
|(9275,[0,2821,881...|      7.0| 6.94273031083279|
|(9275,[0,2821,881...|      7.1| 6.94525449912506|
|(9275,[0,2821,881...|      6.6|6.769183881753985|
|(9275,[0,3127,877...|      6.5|6.443814625262282|
|(9275,[0,3127,881...|      7.0|6.962229002250894|
|(9275,[0,3127,881...|      7.1|6.964753190543164|
|(9275,[0,3127,881...|      6.6| 6.78868257317209|
|(9275,[0,3201,877...|      6.5|6.443814625262282|
|(9275,[0,3201,881...|      7.0|6.962229002250894|
|(9275,[0,3201,881...|      7.1|6.964753190543164|
|(9275,[0,3201,881...|      6.6| 6.78868257317209|
|(9275,[0,3323,881...|      7.0|6.962229002250894|
|(9275,[0,3323,881...|      7.1|6.964753190543164|
|(9275,[0,3323,881...|      6.6| 6.78868257317209|
|(9275,[0,3426,881...|      7.0|6.962229002250894|
|(9275,[0,3426,881...|      7.1

In [50]:
##Training
## Model Evaluation

prediction_and_labels = lrModel.evaluate(final)

print("RMSE: %f" % prediction_and_labels.rootMeanSquaredError)
print("r2: %f" % prediction_and_labels.r2)

prediction_and_labels.predictions.show(5000)

RMSE: 0.338726
r2: 0.901353
+--------------------+---------+------------------+
|            features|avgRating|        prediction|
+--------------------+---------+------------------+
|(9275,[140,3317,8...|      8.2| 7.971256840457984|
|(9275,[140,6879,8...|      8.2| 7.664399169749844|
|(9275,[140,2696,8...|      8.2| 7.882019471742838|
|(9275,[140,4528,8...|      8.2| 7.796281812728764|
|(9275,[1478,6743,...|      8.1| 8.074219016935846|
|(9275,[1478,8394,...|      8.1| 8.074219016935846|
|(9275,[1478,4899,...|      8.1| 7.985122875300299|
|(9275,[930,7441,8...|      8.1| 6.450161764837051|
|(9275,[930,8139,8...|      8.1| 6.450161764837051|
|(9275,[930,7093,8...|      8.1| 6.450161764837051|
|(9275,[2074,2588,...|      7.9|7.8293539798435035|
|(9275,[16,4695,87...|      7.8|7.6555475276260445|
|(9275,[16,2678,87...|      7.8| 7.661698877361683|
|(9275,[522,2431,8...|      7.7| 7.701360695162393|
|(9275,[522,2445,8...|      7.7| 7.752342876950669|
|(9275,[522,7165,8...|      7.7| 7.6

In [51]:
##Training
## Training and Testing Residuals

training_residuals = trainingSummary.residuals
training_residuals.toDF('residuals')
#train_resi = training_residuals.write.csv("train.csv")

prediction_residuals = prediction_and_labels.residuals
prediction_residuals.toDF('residuals')
#predict_resi = prediction_residuals.write.csv("predict.csv")

DataFrame[residuals: double]

In [52]:
####-----------------Training Done-------------------####

In [53]:
## Bridge
## Save the trained Model then load it

#lrModel.save("pythonLinearRegression")
Linear_Regression_model = LinearRegressionModel.load("pythonLinearRegression/")

In [54]:
####-----------------Start Prediction-------------------####

In [55]:
## Prediction

## Load new data to be predicted by our model

new_data = spark.read.options(header='true', inferschema='true', delimiter=',').csv("2000-2018.csv")  
display(new_data)
new_data.cache()
new_data.show(500)
new_data.printSchema()

DataFrame[title: string, year: int, avgRating: double, votes: int, directorName: string, writeName: string, runtime: int, genre: string]

+--------------------+----+---------+------+--------------------+--------------------+-------+--------------------+
|               title|year|avgRating| votes|        directorName|           writeName|runtime|               genre|
+--------------------+----+---------+------+--------------------+--------------------+-------+--------------------+
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|       Roman Coppola|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|      Kunichi Nomura|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|        Wes Anderson|    101|Adventure,Animati...|
|        Isle of Dogs|2018|      8.2| 18140|        Wes Anderson|   Jason Schwartzman|    101|Adventure,Animati...|
|       A Quiet Place|2018|      8.1| 68050|      John Krasinski|         Bryan Woods|     90| Drama,Horror,Sci-Fi|
|       A Quiet Place|2018|      8.1| 68050|      John Krasinski|       

In [56]:
## Prediction

## Feed the New data to the pipeline i.e. VectorAssembler -> Pipeline
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model_new=pipeline.fit(new_data)
transformed_new = model_new.transform(new_data)

transformed_new.printSchema()


root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- votes: integer (nullable = true)
 |-- directorName: string (nullable = true)
 |-- writeName: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- directorName_indexed: double (nullable = false)
 |-- writeName_indexed: double (nullable = false)
 |-- genre_indexed: double (nullable = false)
 |-- runtime_indexed: double (nullable = false)
 |-- year_indexed: double (nullable = false)
 |-- directorName_indexed_encoded: vector (nullable = true)
 |-- writeName_indexed_encoded: vector (nullable = true)
 |-- genre_indexed_encoded: vector (nullable = true)
 |-- runtime_indexed_encoded: vector (nullable = true)
 |-- year_indexed_encoded: vector (nullable = true)
 |-- features: vector (nullable = true)



In [57]:
## Prediction
## Select required columns from the resulting dataframe (New data)

final_new = transformed_new.select("features", "avgRating")
final_new.printSchema()
final_new.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)

+--------------------+---------+
|            features|avgRating|
+--------------------+---------+
|(9275,[140,3317,8...|      8.2|
|(9275,[140,6879,8...|      8.2|
|(9275,[140,2696,8...|      8.2|
|(9275,[140,4528,8...|      8.2|
|(9275,[1478,6743,...|      8.1|
|(9275,[1478,8394,...|      8.1|
|(9275,[1478,4899,...|      8.1|
|(9275,[930,7441,8...|      8.1|
|(9275,[930,8139,8...|      8.1|
|(9275,[930,7093,8...|      8.1|
|(9275,[2074,2588,...|      7.9|
|(9275,[16,4695,87...|      7.8|
|(9275,[16,2678,87...|      7.8|
|(9275,[522,2431,8...|      7.7|
|(9275,[522,2445,8...|      7.7|
|(9275,[522,7165,8...|      7.7|
|(9275,[522,3746,8...|      7.7|
|(9275,[954,3271,8...|      7.4|
|(9275,[1021,3271,...|      7.4|
|(9275,[1182,7189,...|      7.2|
+--------------------+---------+
only showing top 20 rows



In [58]:
## Prediction
## Use the loaded model to predict new data

new_data_predict = Linear_Regression_model.transform(lr_test)
new_data_predict.printSchema()

new_data_predict.show()

root
 |-- features: vector (nullable = true)
 |-- avgRating: double (nullable = true)
 |-- prediction: double (nullable = false)

+--------------------+---------+------------------+
|            features|avgRating|        prediction|
+--------------------+---------+------------------+
|(9275,[0,3437,881...|      6.6| 6.646305135468727|
|(9275,[0,3451,881...|      7.1| 6.954271161769046|
|(9275,[0,3451,881...|      6.6| 6.836034729314842|
|(9275,[0,3736,881...|      7.1| 6.954271161769046|
|(9275,[0,3793,881...|      7.1| 6.954271161769046|
|(9275,[0,3793,881...|      6.6| 6.836034729314842|
|(9275,[0,6445,881...|      6.6| 6.836034729314842|
|(9275,[0,6516,881...|      7.1| 6.954271161769046|
|(9275,[0,6519,881...|      6.6| 6.646305135468727|
|(9275,[0,6549,881...|      7.0| 6.897006355880599|
|(9275,[0,8104,881...|      7.1|7.0552687106643255|
|(9275,[1,2423,896...|      7.6|  7.35015334421343|
|(9275,[1,2424,878...|      8.1| 7.720989645955537|
|(9275,[1,2424,887...|      7.5|  7.15

In [59]:
####-----------------Prediction Done-------------------####

In [60]:
## Extract relevant information for visualization

actual_and_predicted = new_data_predict.select("avgRating", "prediction")


In [61]:
## Store the predicted values

#actual_and_predicted.write.csv("out1.csv")

In [62]:
## Load the stored csv file for visulization

out = spark.read.csv("out1.csv").withColumnRenamed('_c0','label').withColumnRenamed('_c1','prediction')
#out.orderBy('_c2').show(3)
out.show()
out.printSchema()

+-----+------------------+
|label|        prediction|
+-----+------------------+
|  6.6| 6.847654477942388|
|  6.6|6.8530966680556755|
|  7.1| 6.935759534406529|
|  6.6| 6.840027830502715|
|  7.1| 6.977588911570789|
|  6.6|6.7886910493067845|
|  6.6| 6.840027830502715|
|  7.1| 6.935759534406529|
|  7.1| 6.834315675818278|
|  7.0| 6.952174996852843|
|  7.1|6.9727570910887176|
|  7.0| 6.952174996852843|
|  7.1|6.9727570910887176|
|  7.0| 6.952174996852843|
|  7.6|7.5168330727154595|
|  7.8| 7.297435468107926|
|  7.5| 6.964382963616419|
|  8.1| 7.593668035721258|
|  7.5| 6.935281092277366|
|  7.3|  7.31871744693613|
+-----+------------------+
only showing top 20 rows

root
 |-- label: string (nullable = true)
 |-- prediction: string (nullable = true)



In [63]:
df = pd.read_csv("out.csv")
df1 = df.iloc[500:540,:]

In [64]:
table = ff.create_table(df1)
py.iplot(table, filename='table1')

In [65]:
predited_rating = Bar(y=df1.Prediction,
                  name='Predicted Rating',
                  marker=dict(color='#206ec1'))

actual_rating = Bar(y=df1.Label,
                name='Actual Rating',
                marker=dict(color='#8db72c'))

data = [predited_rating, actual_rating]

layout = Layout(title="Actual and Predicted Ratings",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))
fig = Figure(data=data, layout=layout)

py.iplot(fig, filename='styled_bar')

In [66]:
trace_actual = go.Scatter(
    y = df.Label,
    name='Actual Rating',
    mode = 'markers',
    marker=dict(color='#8db72c'))


trace_predicted = go.Scatter(
    y = df.Prediction,
    name='Predicted Rating',
    mode = 'line',
    marker=dict(color='#206ec1'))

layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual, trace_predicted]
py.iplot(data, filename='basic-scatter')

In [67]:
residual_df = pd.read_csv("train1.csv")

In [68]:
trace_actual = go.Scatter(
    y = residual_df.Residual,
    name='Actual Rating',
    mode = 'line',
    marker=dict(color='#8db72c'))


layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual]
py.iplot(data, filename='basic-scatter')

In [69]:
residual_df = pd.read_csv("predict1.csv")

In [70]:
trace_actual = go.Scatter(
    y = residual_df.Residual,
    name='Actual Rating',
    mode = 'line',
    marker=dict(color='#206ec1'))


layout = Layout(title="Scatter Plot",
                xaxis=dict(title='Movies'),
                yaxis=dict(title='Ratings (1 - 10)'))

fig = Figure(data=data, layout=layout)

data = [trace_actual]
py.iplot(data, filename='basic-scatter')