In [1]:

from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator


from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer


In [2]:
'''sqlContext = SQLContext(sc)
movieschema = StructType([\
        StructField('Budget', DoubleType(), False),\
        StructField('domesticgross', DoubleType(), False),\
        StructField('globalgross', DoubleType(), False),\
        StructField('duration', DoubleType(), False),\
        StructField('language', IntegerType(), False),\
        StructField('country', IntegerType(), False),\
        StructField('imdb_score', DoubleType(), False),\
        StructField('majorgenres', IntegerType(), False),\
    ])
movieschema'''
df = spark.sql('select * from movieproject')

df.show(5)

In [3]:
df.cache()

In [4]:
display(df)

In [5]:
print "Our dataset has %d rows." % df.count()

In [6]:
df.printSchema()

In [7]:
from pyspark.sql.functions import col  
# for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

In [8]:
train, test = df.randomSplit([0.7, 0.3])
print "We have %d training examples and %d test examples." % (train.count(), test.count())

In [9]:
display(train.select("budget", "globalgross"))

In [10]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('globalgross')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [11]:
from pyspark.ml.regression import GBTRegressor
# gbt
gbt = GBTRegressor(labelCol="globalgross")


In [12]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# gbt
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [13]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])


In [14]:
pipelineModel = pipeline.fit(train)

In [15]:
predictions = pipelineModel.transform(test)

In [16]:
display(predictions.select("globalgross", "prediction", *featuresCols))


In [17]:
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse