In [1]:
import sys
#sys.path.append('jars/mleap/python')
import mleap.pyspark
from pyspark.ml.linalg import Vectors
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark import SparkContext
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, RFormula, VectorIndexer
from pyspark.ml.regression import GBTRegressor, GBTRegressionModel
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql.types import *

In [2]:
SparkContext.setSystemProperty('spark.driver.memory', '2g')
SparkContext.setSystemProperty('spark.driver.cores', '3')
SparkContext.setSystemProperty('spark.executor.memory', '2g')
SparkContext.setSystemProperty('spark.executor.cores', '3')
SparkContext.setSystemProperty('spark.driver.memoryOverhead', '1g')
SparkContext.setSystemProperty('spark.storage.memoryFraction', '0.9')

In [3]:
data_df = spark.read.csv('abt/data/*',header=True, inferSchema=True)
data_df.printSchema()

root
 |-- idmovie: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- director: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- budgetvalue: long (nullable = true)



## Summary Statistics

#### Evaluating continuous features

In [4]:
data_df.summary().select('summary','budgetvalue', 'runtime','rating').show()

+-------+--------------------+------------------+------------------+
|summary|         budgetvalue|           runtime|            rating|
+-------+--------------------+------------------+------------------+
|  count|               18495|             18495|             18495|
|   mean|6.0121800647904836E7|100.06520681265206|3.1008229251148713|
| stddev| 7.384994399567893E8|34.253022733437874|0.6089458576840447|
|    min|                   1|                 0|               0.5|
|    25%|             3000000|                91|              2.77|
|    50%|            11500000|               101|              3.17|
|    75%|            34000000|               116|              3.52|
|    max|         35000000000|               359|               5.0|
+-------+--------------------+------------------+------------------+



#### Defining Features

In [5]:
#'runtime', 'rating','budgetvalue'
continuous_cols  = ['rating']
categorical_cols = ['director','genres']

In [6]:
#def featuresCreation(df):

categorical_indexers = [
    StringIndexer(inputCol=column, outputCol="{0}_indexed".format(column), handleInvalid='keep')
    for column in categorical_cols
]

categorical_features = [
    OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol="{0}_encoded".format(indexer.getOutputCol()))
    for indexer in categorical_indexers
]

continuous_feature = [VectorAssembler(
    inputCols=[column for column in continuous_cols],
    outputCol="continuous_features"
)]


all_features = continuous_feature + categorical_features

universal_assembler = VectorAssembler(
    inputCols=[feature.getOutputCol() for feature in all_features],
    outputCol="features"
)

gbt = GBTRegressor(featuresCol='features',labelCol='budgetvalue', maxDepth=6, maxIter=45)

stages = categorical_indexers + \
         categorical_features + \
         continuous_feature + \
         [universal_assembler] + \
         [gbt]

universal_pipeline = Pipeline(stages=stages)


In [7]:
(train_df, test_df) = data_df.randomSplit([0.80,0.20], 40)

In [8]:
### GRADIENT BOOSTING ##############################################################################################
#gbt                = GBTRegressor(featuresCol='features',labelCol='budgetvalue', maxDepth=6, maxIter=45)
gbt_model          = universal_pipeline.fit(train_df)#gbt.fit(train_df)
print("### MODEL TRAINED")
gbt_pred           = gbt_model.transform(test_df)
print("### MODEL PREDICTED")
gbt_evaluator_rmse = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='rmse')
gbt_rmse           = gbt_evaluator_rmse.evaluate(gbt_pred)
gbt_evaluator_r2   = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='r2')
gbt_r2             = gbt_evaluator_r2.evaluate(gbt_pred)

print('### GRADIENT BOOSTED RMSE:')
print(gbt_rmse)
print('### GRADIENT BOOSTED r2:')
print(gbt_r2)

### MODEL TRAINED
### MODEL PREDICTED
### GRADIENT BOOSTED RMSE:
291029802.54118365
### GRADIENT BOOSTED r2:
0.6847638541277404


In [9]:
print('GRADIENT BOOSTED #########################')
gbt_pred.select('idmovie','director','genres','budgetvalue', 'prediction')\
        .withColumn('pred', gbt_pred['prediction'].cast(DecimalType(38,2)))\
        .drop('prediction')\
        .withColumnRenamed('pred', 'prediction').show(10)

GRADIENT BOOSTED #########################
+-------+--------------+---------+-----------+-----------+
|idmovie|      director|   genres|budgetvalue| prediction|
+-------+--------------+---------+-----------+-----------+
|      1| John Lasseter|Adventure|   30000000|40994034.42|
|      1| John Lasseter|Animation|   30000000|30384369.20|
|      1| John Lasseter|   Comedy|   30000000|30384369.20|
|      2|  Joe Johnston|Adventure|   65000000|40994034.42|
|      2|  Joe Johnston|  Fantasy|   65000000|30384369.20|
|     15|  Renny Harlin|   Action|   98000000|38618105.45|
|     20|  Joseph Ruben| Thriller|   68000000|30384369.20|
|     23|Richard Donner|   Action|   50000000|38618105.45|
|     23|Richard Donner|    Crime|   50000000|30384369.20|
|     34|  Chris Noonan| Children|   30000000|30384369.20|
+-------+--------------+---------+-----------+-----------+
only showing top 10 rows



In [10]:
### SAVING THE MODEL
#gbt_model.save('gbt_model/')

In [14]:
### MLEAP SERIALIZATION

#gbt_model.serializeToBundle("jar:file:/Users/mauriciolins/Documents/uaM/2.Projects/MoviesRec/jars/movies_gbt.zip", gbt_model.transform(train_df))
gbt_model.serializeToBundle("jar:file:jars/mleap/xxx.zip",gbt_model.transform(train_df))


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/mauriciolins/Documents/uaM/4.Softs/spark/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/mauriciolins/Documents/uaM/4.Softs/spark/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/Users/mauriciolins/Documents/uaM/4.Softs/spark/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: ml.combust.mleap.spark.SimpleSparkSerializer does not exist in the JVM

In [91]:
### RANDOM FOREST ##################################################################################################
rf = RandomForestRegressor(featuresCol='features', labelCol='budgetvalue', numTrees=15)

rf_model        = rf.fit(train_df)
rf_pred         = rf_model.transform(test_df)
rf_evaluator    = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='rmse')
rf_rmse         = rf_evaluator.evaluate(rf_pred)
rf_evaluator_r2 = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='r2')
rf_r2           = rf_evaluator_r2.evaluate(rf_pred)

print('### RANDOM FOREST RMSE:')
print(rf_rmse)
print('### RANDOM FOREST r2:')
print(rf_r2)
#rf_pred.withColumn('pred', rf_pred['prediction'].cast(DecimalType(30,2))).show(truncate=False)

### RANDOM FOREST RMSE:
351395990.07634676
### RANDOM FOREST r2:
0.5404268132355837


In [92]:
### DECISION TREE ###################################################################################################
dt              = DecisionTreeRegressor(featuresCol='features', labelCol='budgetvalue')
dt_model        = dt.fit(train_df)
dt_pred         = dt_model.transform(test_df)
dt_evaluator    = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='rmse')
dt_rmse         = dt_evaluator.evaluate(dt_pred)
dt_evaluator_r2 = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='r2')
dt_r2           = dt_evaluator_r2.evaluate(dt_pred)

print('### DECISION TREE RMSE:')
print(dt_rmse)
print('### RANDOM FOREST r2:')
print(dt_r2)

### DECISION TREE RMSE:
212444343.14663234
### RANDOM FOREST r2:
0.8320223803781781


In [93]:
from pyspark.sql.types import DecimalType
print('DECISION TREE #########################')
dt_pred.select('idmovie','director','genres','budgetvalue', 'prediction')\
        .withColumn('pred', dt_pred['prediction'].cast(DecimalType(38,2)))\
        .drop('prediction')\
        .withColumnRenamed('pred', 'prediction').show(10)
#################################################################################
print('RANDOM FOREST #########################')
rf_pred.select('idmovie','director','genres','budgetvalue', 'prediction')\
        .withColumn('pred', rf_pred['prediction'].cast(DecimalType(38,2)))\
        .drop('prediction')\
        .withColumnRenamed('pred', 'prediction').show(10)
#################################################################################
print('GRADIENT BOOSTED #########################')
gbt_pred.select('idmovie','director','genres','budgetvalue', 'prediction')\
        .withColumn('pred', gbt_pred['prediction'].cast(DecimalType(38,2)))\
        .drop('prediction')\
        .withColumnRenamed('pred', 'prediction').show(10)

DECISION TREE #########################
+-------+--------------+---------+-----------+-----------+
|idmovie|      director|   genres|budgetvalue| prediction|
+-------+--------------+---------+-----------+-----------+
|      1| John Lasseter|Adventure|   30000000|43101494.29|
|      1| John Lasseter|Animation|   30000000|43101494.29|
|      1| John Lasseter|   Comedy|   30000000|43101494.29|
|      2|  Joe Johnston|Adventure|   65000000|43101494.29|
|      2|  Joe Johnston|  Fantasy|   65000000|43101494.29|
|     15|  Renny Harlin|   Action|   98000000|43101494.29|
|     20|  Joseph Ruben| Thriller|   68000000|43101494.29|
|     23|Richard Donner|   Action|   50000000|43101494.29|
|     23|Richard Donner|    Crime|   50000000|43101494.29|
|     34|  Chris Noonan| Children|   30000000|43101494.29|
+-------+--------------+---------+-----------+-----------+
only showing top 10 rows

RANDOM FOREST #########################
+-------+--------------+---------+-----------+-----------+
|idmovie|

In [95]:
print(gbt_model.toDebugString)

GBTRegressionModel (uid=GBTRegressor_e32e0291ceb6) with 45 trees
  Tree 0 (weight 1.0):
    If (feature 1224 in {0.0})
     If (feature 683 in {0.0})
      If (feature 3995 in {0.0})
       If (feature 3761 in {0.0})
        If (feature 108 in {0.0})
         If (feature 2836 in {0.0})
          Predict: 4.174773514731475E7
         Else (feature 2836 not in {0.0})
          Predict: 1.0E10
        Else (feature 108 not in {0.0})
         If (feature 0 <= 3.4850000000000003)
          Predict: 7.6493875E9
         Else (feature 0 > 3.4850000000000003)
          Predict: 9525000.0
       Else (feature 3761 not in {0.0})
        Predict: 1.9E10
      Else (feature 3995 not in {0.0})
       Predict: 2.0E10
     Else (feature 683 not in {0.0})
      If (feature 0 <= 3.085)
       Predict: 5000000.0
      Else (feature 0 > 3.085)
       Predict: 3.0E10
    Else (feature 1224 not in {0.0})
     Predict: 3.5E10
  Tree 1 (weight 0.1):
    If (feature 374 in {0.0})
     If (feature 2083 in {0.0

In [97]:
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

ExtractFeatureImp(gbt_model.featureImportances, train_df, "features").head(50)
#print(rf_model.featureImportances)

Unnamed: 0,idx,name,score
0,0,continuous_features_rating,0.249237
374,374,director_indexed_encoded_Chan-wook Park,0.050678
4424,4424,genres_indexed_encoded_Mystery,0.033493
108,108,director_indexed_encoded_Bong Joon Ho,0.032679
4417,4417,genres_indexed_encoded_Action,0.022335
2083,2083,director_indexed_encoded_Ajay Pannalal,0.021404
3976,3976,director_indexed_encoded_Jung Huh,0.020639
2955,2955,director_indexed_encoded_Lajos Koltai,0.020469
17,17,director_indexed_encoded_Hayao Miyazaki,0.020276
835,835,director_indexed_encoded_Rohit Shetty,0.019701


In [None]:
### PMML EXAMPLE
#formula            = RFormula(formula = "budgetvalue ~ .")
#gbt                = GBTRegressor(featuresCol='features', labelCol='budgetvalue',seed=9)
#join_pipeline_gbt  = [formula, gbt]
#gbt_pipeline       = Pipeline(stages = join_pipeline_gbt)
#gbt_final          = gbt_pipeline.fit(train_df_2) 
#
#
#### EXPORTING THE MODEL
#PMMLBuilder(sc, data_df, gbt_final) \
#    .putOption(gbt, "compact", True) \
#    .putOption(gbt, "keep_predictionCol", True) \
#    .buildFile("gbt_budget.pmml")