# TMDB movies - random forest regressor

In [0]:
import numpy as np 
import pandas as pd 
import category_encoders as ce 

from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline


# Load Data

In [0]:
dataPath = "/FileStore/tables/tmdb_movies_data.csv"
movies = sqlContext.read.format("csv").option("header","true").option("inferSchema", "true").load(dataPath)
movies = movies.withColumnRenamed("cast", "actors")
pmovies = movies.toPandas()

pmovies.head()

Unnamed: 0,id,imdb_id,popularity,budget,revenue,original_title,actors,homepage,director,tagline,keywords,overview,runtime,genres,production_companies,release_date,vote_count,vote_average,release_year,budget_adj,revenue_adj
0,135397,tt0369610,32.985763,150000000,1513528810,Jurassic World,Chris Pratt|Bryce Dallas Howard|Irrfan Khan|Vi...,http://www.jurassicworld.com/,Colin Trevorrow,The park is open.,monster|dna|tyrannosaurus rex|velociraptor|island,Twenty-two years after the events of Jurassic ...,124,Action|Adventure|Science Fiction|Thriller,Universal Studios|Amblin Entertainment|Legenda...,6/9/2015,5562,6.5,2015,137999939.3,1392445893.0
1,76341,tt1392190,28.419936,150000000,378436354,Mad Max: Fury Road,Tom Hardy|Charlize Theron|Hugh Keays-Byrne|Nic...,http://www.madmaxmovie.com/,George Miller,What a Lovely Day.,future|chase|post-apocalyptic|dystopia|australia,An apocalyptic story set in the furthest reach...,120,Action|Adventure|Science Fiction|Thriller,Village Roadshow Pictures|Kennedy Miller Produ...,5/13/2015,6185,7.1,2015,137999939.3,348161292.5
2,262500,tt2908446,13.112507,110000000,295238201,Insurgent,Shailene Woodley|Theo James|Kate Winslet|Ansel...,http://www.thedivergentseries.movie/#insurgent,Robert Schwentke,One Choice Can Destroy You,based on novel|revolution|dystopia|sequel|dyst...,Beatrice Prior must confront her inner demons ...,119,Adventure|Science Fiction|Thriller,Summit Entertainment|Mandeville Films|Red Wago...,3/18/2015,2480,6.3,2015,101199955.5,271619025.4
3,140607,tt2488496,11.173104,200000000,2068178225,Star Wars: The Force Awakens,Harrison Ford|Mark Hamill|Carrie Fisher|Adam D...,http://www.starwars.com/films/star-wars-episod...,J.J. Abrams,Every generation has a story.,android|spaceship|jedi|space opera|3d,Thirty years after defeating the Galactic Empi...,136,Action|Adventure|Science Fiction|Fantasy,Lucasfilm|Truenorth Productions|Bad Robot,12/15/2015,5292,7.5,2015,183999919.0,1902723130.0
4,168259,tt2820852,9.335014,190000000,1506249360,Furious 7,Vin Diesel|Paul Walker|Jason Statham|Michelle ...,http://www.furious7.com/,James Wan,Vengeance Hits Home,car race|speed|revenge|suspense|car,Deckard Shaw seeks revenge against Dominic Tor...,137,Action|Crime|Thriller,Universal Pictures|Original Film|Media Rights ...,4/1/2015,2947,7.3,2015,174799923.1,1385748801.0


In [0]:
pmovies.info()

In [0]:
movies.createOrReplaceTempView("movies")

In [0]:
%sql 
drop table if exists movies_cleaned; 

create table movies_cleaned as 
select 
int(id) id, 
imdb_id, 
double(popularity) popularity, 
double(budget) budget, 
double(revenue) revenue, 
int(runtime) runtime, 
int(vote_count) votes, 
int(vote_average) votes_avg, 
actors,
genres,
tagline,      
double(budget_adj) budget_ad, 
double(revenue_adj) revenue_ad
from movies 


In [0]:
sqlContext.cacheTable("movies_cleaned")
pmovies = spark.sql("SELECT * FROM movies_cleaned").toPandas()

#drop rows will null values 
pmovies = pmovies.dropna()


In [0]:

#feature engineering 
#pmovies['actors_count'] = pmovies['actors'].str.findall('|').str.len() + 1
pmovies['actors_count'] = pmovies['actors'][0].count('|') + 1
pmovies['genre_count'] = pmovies['genres'][0].count('|') + 1 
pmovies['has_tagline'] = pmovies['tagline'].apply(lambda x: 1 if x is not None else 0)




pmovies = pmovies[(pmovies['budget_ad'] > 0) | (pmovies['revenue_ad'] > 0) ]

pmovies.info()

#pmovies[pmovies.has_tagline == 0]

In [0]:
prepped = spark.createDataFrame(pmovies) #.na.fill(0)
display(prepped)

id,imdb_id,popularity,budget,revenue,runtime,votes,votes_avg,actors,genres,tagline,budget_ad,revenue_ad,actors_count,genre_count,has_tagline
135397.0,tt0369610,32.985763,150000000.0,1513528810.0,124.0,5562.0,6.0,Chris Pratt|Bryce Dallas Howard|Irrfan Khan|Vincent D'Onofrio|Nick Robinson,Action|Adventure|Science Fiction|Thriller,The park is open.,137999939.3,1392445893.0,5,4,1
76341.0,tt1392190,28.419936,150000000.0,378436354.0,120.0,6185.0,7.0,Tom Hardy|Charlize Theron|Hugh Keays-Byrne|Nicholas Hoult|Josh Helman,Action|Adventure|Science Fiction|Thriller,What a Lovely Day.,137999939.3,348161292.5,5,4,1
262500.0,tt2908446,13.112507,110000000.0,295238201.0,119.0,2480.0,6.0,Shailene Woodley|Theo James|Kate Winslet|Ansel Elgort|Miles Teller,Adventure|Science Fiction|Thriller,One Choice Can Destroy You,101199955.5,271619025.4,5,4,1
140607.0,tt2488496,11.173104,200000000.0,2068178225.0,136.0,5292.0,7.0,Harrison Ford|Mark Hamill|Carrie Fisher|Adam Driver|Daisy Ridley,Action|Adventure|Science Fiction|Fantasy,Every generation has a story.,183999919.0,1902723130.0,5,4,1
168259.0,tt2820852,9.335014,190000000.0,1506249360.0,137.0,2947.0,7.0,Vin Diesel|Paul Walker|Jason Statham|Michelle Rodriguez|Dwayne Johnson,Action|Crime|Thriller,Vengeance Hits Home,174799923.1,1385748801.0,5,4,1
281957.0,tt1663202,9.1107,135000000.0,532950503.0,156.0,3929.0,7.0,Leonardo DiCaprio|Tom Hardy|Will Poulter|Domhnall Gleeson|Paul Anderson,Western|Drama|Adventure|Thriller,"(n. One who has returned, as if from the dead.)",124199945.4,490314247.0,5,4,1
87101.0,tt1340138,8.654359,155000000.0,440603537.0,125.0,2598.0,5.0,Arnold Schwarzenegger|Jason Clarke|Emilia Clarke|Jai Courtney|J.K. Simmons,Science Fiction|Action|Thriller|Adventure,Reset the future,142599937.3,405355075.7,5,4,1
286217.0,tt3659388,7.6674,108000000.0,595380321.0,141.0,4572.0,7.0,Matt Damon|Jessica Chastain|Kristen Wiig|Jeff Daniels|Michael PeÃ±a,Drama|Adventure|Science Fiction,Bring Him Home,99359956.28,547749654.3,5,4,1
211672.0,tt2293640,7.404165,74000000.0,1156730962.0,91.0,2893.0,6.0,Sandra Bullock|Jon Hamm|Michael Keaton|Allison Janney|Steve Coogan,Family|Animation|Adventure|Comedy,"Before Gru, they had a history of bad bosses",68079970.04,1064192017.0,5,4,1
150540.0,tt2096673,6.326804,175000000.0,853708609.0,94.0,3935.0,8.0,Amy Poehler|Phyllis Smith|Richard Kind|Bill Hader|Lewis Black,Comedy|Animation|Family,Meet the little voices inside your head.,160999929.2,785411574.7,5,4,1


In [0]:
import random
random.seed(10)

# Using Base line Attributes

In [0]:
nonFeatureCols = ["id", "imdb_id", "budget", "revenue", "actors", "genres", "tagline", "actors_count", "genre_count", "has_tagline"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]
featureCols

In [0]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("revenue_ad")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [3, 5, 8])
  .addGrid(rfModel.numTrees, [5, 10, 15])
  .build())
  

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("revenue_ad")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", "double(revenue_ad)", "(prediction/double(revenue_ad))*100 as perc_diff"))
  
#display(holdout)

from pyspark.mllib.evaluation import RegressionMetrics
rm3 = RegressionMetrics(holdout.select("raw_prediction", "revenue_ad").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm3.meanSquaredError)
print("MAE: ", rm3.meanAbsoluteError)
print("RMSE Squared: ", rm3.rootMeanSquaredError)
print("R Squared: ", rm3.r2)
print("Explained Variance: ", rm3.explainedVariance, "\n")

# actor count

In [0]:
nonFeatureCols = ["id", "imdb_id", "budget", "revenue", "actors", "genres", "tagline", "genre_count", "has_tagline"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]
featureCols

In [0]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("revenue_ad")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [3, 5, 8])
  .addGrid(rfModel.numTrees, [5, 10, 15])
  .build())
  

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("revenue_ad")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", "double(revenue_ad)", "(prediction/double(revenue_ad))*100 as perc_diff"))
  
#display(holdout)

from pyspark.mllib.evaluation import RegressionMetrics
rm3 = RegressionMetrics(holdout.select("raw_prediction", "revenue_ad").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm3.meanSquaredError)
print("MAE: ", rm3.meanAbsoluteError)
print("RMSE Squared: ", rm3.rootMeanSquaredError)
print("R Squared: ", rm3.r2)
print("Explained Variance: ", rm3.explainedVariance, "\n")

# Genre Count

In [0]:
nonFeatureCols = ["id", "imdb_id", "budget", "revenue", "actors", "genres", "tagline", "has_tagline"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]
featureCols

In [0]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("revenue_ad")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [3, 5, 8])
  .addGrid(rfModel.numTrees, [5, 10, 15])
  .build())
  

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("revenue_ad")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", "double(revenue_ad)", "(prediction/double(revenue_ad))*100 as perc_diff"))
  
#display(holdout)

from pyspark.mllib.evaluation import RegressionMetrics
rm3 = RegressionMetrics(holdout.select("raw_prediction", "revenue_ad").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm3.meanSquaredError)
print("MAE: ", rm3.meanAbsoluteError)
print("RMSE Squared: ", rm3.rootMeanSquaredError)
print("R Squared: ", rm3.r2)
print("Explained Variance: ", rm3.explainedVariance, "\n")

#  has a tag line

In [0]:
nonFeatureCols = ["id", "imdb_id", "budget", "revenue", "actors", "genres", "tagline"]
featureCols = [item for item in prepped.columns if item not in nonFeatureCols]
featureCols

In [0]:
# VectorAssembler Assembles all of these columns into one single vector. To do this, set the input columns and output column. Then that assembler will be used to transform the prepped data to the final dataset.
from pyspark.ml.feature import VectorAssembler

assembler = (VectorAssembler()
  .setInputCols(featureCols)
  .setOutputCol("features"))

finalPrep = assembler.transform(prepped)
training, test = finalPrep.randomSplit([0.7, 0.3])

#  Going to cache the data to make sure things stay snappy!
training.cache()
test.cache()

print(training.count()) 
print(test.count())

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml import Pipeline

rfModel = (RandomForestRegressor()
  .setLabelCol("revenue_ad")
  .setFeaturesCol("features"))

paramGrid = (ParamGridBuilder()
  .addGrid(rfModel.maxDepth, [3, 5, 8])
  .addGrid(rfModel.numTrees, [5, 10, 15])
  .build())
  

stages = [rfModel]

pipeline = Pipeline().setStages(stages)

cv = (CrossValidator() # you can feel free to change the number of folds used in cross validation as well
  .setEstimator(pipeline) # the estimator can also just be an individual model rather than a pipeline
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(RegressionEvaluator().setLabelCol("revenue_ad")))

pipelineFitted = cv.fit(training)

print("The Best Parameters:\n--------------------")
print(pipelineFitted.bestModel.stages[0])
pipelineFitted.bestModel.stages[0].extractParamMap()

In [0]:
holdout = (pipelineFitted.bestModel
  .transform(test)
  .selectExpr("prediction as raw_prediction", "double(revenue_ad)", "(prediction/double(revenue_ad))*100 as perc_diff"))
  
#display(holdout)

from pyspark.mllib.evaluation import RegressionMetrics
rm3 = RegressionMetrics(holdout.select("raw_prediction", "revenue_ad").rdd.map(lambda x:  (x[0], x[1])))

print("MSE: ", rm3.meanSquaredError)
print("MAE: ", rm3.meanAbsoluteError)
print("RMSE Squared: ", rm3.rootMeanSquaredError)
print("R Squared: ", rm3.r2)
print("Explained Variance: ", rm3.explainedVariance, "\n")