In [1]:
# Dataframe creation
carsDF = sqlContext.sql("SELECT * FROM cars")
carsDF.cache()
display(carsDF)

NameError: name 'sqlContext' is not defined

In [2]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

def parseReg (x):
  if x != 'Nije registrovan': 
    return 'Registrovan' 
  else : 
    return x
def parseInt(x):
  if x == "":
    return u'?tof'
  else :
    return x
regUdf = UserDefinedFunction(parseReg, StringType())
intUdf = UserDefinedFunction(parseInt, StringType())
carsDF = carsDF.select("*", regUdf("registration").alias("reg")).drop("registration")
carsDF = carsDF.select("*", intUdf("interiorMaterial").alias('interior')).drop('interiorMaterial')
carsDF.show()

In [3]:
# Feature extraction
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline

prepPipeline = Pipeline()
prepStages = [
  StringIndexer(inputCol="fuelType", outputCol="fuelTypeIndex"),
  OneHotEncoder(dropLast=True, inputCol="fuelTypeIndex", outputCol="fuelTypeVec"),
  StringIndexer(inputCol="chassis", outputCol="chassisIndex"),
  OneHotEncoder(dropLast=True, inputCol="chassisIndex", outputCol="chassisVec"),
  StringIndexer(inputCol="reg", outputCol="regIndex"),
  OneHotEncoder(dropLast=True, inputCol="regIndex", outputCol="regVec"),
  StringIndexer(inputCol="userType", outputCol="userTypeIndex"),
  OneHotEncoder(dropLast=True, inputCol="userTypeIndex", outputCol="userTypeVec"),
  StringIndexer(inputCol="gearBox", outputCol="gearBoxIndex"),
  OneHotEncoder(dropLast=True, inputCol="gearBoxIndex", outputCol="gearBoxVec"),
  StringIndexer(inputCol="interior", outputCol="interiorIndex"),
  OneHotEncoder(dropLast=True, inputCol="interiorIndex", outputCol="interiorVec")
]
prepPipeline.setStages(prepStages)
carsDF = prepPipeline.fit(carsDF).transform(carsDF)

In [4]:
carsDF.select("userTypeVec").distinct().show()

In [5]:
# from pyspark.sql.types import *
# customSchema = StructType([ \
#     StructField('mileage', IntegerType(), True), \
#     StructField('year', IntegerType(), True), \
#     StructField('fuelType', StringType(), True), \
#     StructField('power', IntegerType(), True), \
#     StructField('price', DoubleType(), True), \
#     StructField('engineVolume', DoubleType(), True)])

# carsDF = sqlContext.createDataFrame(carsDF, customSchema)

In [6]:
print carsDF.dtypes
# display(carsDF.describe())

In [7]:
carsDF = carsDF.filter("power > 30")\
               .filter("price < 99999999")\
               .filter("engineVolume > 400")\
               .filter("engineVolume < 3500")\
#                .filter("mileage > 3000")\
#                .filter("year < 2015")
display(carsDF.describe())

In [8]:
# Data visualisation...
display(carsDF.select("price" , "engineVolume"))

In [9]:
from pyspark.ml.feature import VectorAssembler
carsDF = carsDF.select("mileage", "year", "engineVolume", "power", "fuelTypeVec", "chassisVec", "regVec", "userTypeVec", "gearBoxVec", "interiorVec", "price")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["mileage", "year", "engineVolume", "power", "fuelTypeVec", "chassisVec", "regVec", "userTypeVec", "gearBoxVec", "interiorVec"])
vectorizer.setOutputCol("features")

In [10]:
seed = 1800009193L
(split20DF, split80DF) = carsDF.randomSplit([20.0, 80.0], seed)

# Let's cache these datasets for performance
testSetDF = split20DF.cache()
trainingSetDF = split80DF.cache()

In [11]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

# Let's initialize our linear regression learner
lr = LinearRegression()
lr.setSolver("l-bfgs")
# We use explain params to dump the parameters we can use
# print(lr.explainParams())

In [12]:
# Now we set the parameters for the method
lr.setPredictionCol("Predicted_price")\
  .setLabelCol("price")\
  .setMaxIter(100)\
  .setRegParam(0.1)


# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline()

lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
# lrModel = lrPipeline.fit(trainingSetDF)

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Predicted_price", labelCol="price", metricName="rmse")

In [14]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.01 to 0.10
regParam = [0.09, 0.1]

# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
lrModel = crossval.fit(trainingSetDF).bestModel
print lrModel.stages[-1]._java_obj.parent().getRegParam()

In [15]:
# Printing the function:
intercept = lrModel.stages[1].intercept
print intercept

# The coefficents (i.e., weights) are as follows:
weights = lrModel.stages[1].coefficients
print weights

# Create a list of the column names (without PE)
featuresNoLabel = [col for col in carsDF.columns if col != "price" and not col.endswith("Index")]
print featuresNoLabel

# Merge the weights and labels
coefficents = zip(weights, featuresNoLabel)
for coef in coefficents:
  print coef

# Now let's sort the coefficients from greatest absolute weight most to the least absolute weight
coefficents.sort(key=lambda tup: abs(tup[0]), reverse=True)

equation = "y = {intercept}".format(intercept=intercept)
variables = []
for x in coefficents:
    weight = abs(x[0])
    name = x[1]
    symbol = "+" if (x[0] > 0) else "-"
    equation += (" {} ({} * {})".format(symbol, weight, name))

# Finally here is our equation
print("Linear Regression Equation: " + equation)

In [16]:
# Apply our LR model to the test data and predict power output
predictionsAndLabelsDF = lrModel.transform(testSetDF).select('price', "Predicted_price")

In [17]:
# DecisionTreeRegressor
from pyspark.ml.regression import DecisionTreeRegressor

# Create a DecisionTreeRegressor
dt = DecisionTreeRegressor()

dt.setLabelCol("price")\
  .setPredictionCol("Predicted_price")\
  .setFeaturesCol("features")\
  .setMaxBins(100)

# Create a Pipeline
dtPipeline = Pipeline()


# Set the stages of the Pipeline
dtPipeline.setStages([vectorizer, dt])

# dtModel = dtPipeline.fit(trainingSetDF)

In [18]:
crossval.setEstimator(dtPipeline)

# Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2,3,4])
             .build())

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
dtModel = crossval.fit(trainingSetDF).bestModel

print dtModel.stages[-1]._java_obj.parent().getMaxDepth()

In [19]:
from pyspark.ml.regression import RandomForestRegressor

# Create a RandomForestRegressor
rf = RandomForestRegressor()


rf.setLabelCol("price")\
  .setPredictionCol("Predicted_price")\
  .setFeaturesCol("features")\
  .setSeed(100088121L)\
  .setMaxDepth(8)\
  .setNumTrees(20)

# Create a Pipeline
rfPipeline = Pipeline()

# Set the stages of the Pipeline
rfPipeline.setStages([vectorizer, rf])

# rfModel = rfPipeline.fit(trainingSetDF)

In [20]:
crossval.setEstimator(rfPipeline)

# Let's tune over our rf.maxBins parameter on the values 50 and 100, create a parameter grid using the ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [20, 30, 40])
             .addGrid(rf.maxBins, [50, 70, 100])
             .build())

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
rfModel = crossval.fit(trainingSetDF).bestModel
print rfModel.stages[-1]._java_obj.parent().getNumTrees()
print rfModel.stages[-1]._java_obj.parent().getMaxBins()

In [21]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Predicted_price", labelCol="price", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictionsAndLabelsDF)

# print("Root Mean Squared Error: %.2f" % rmse)
# Now let's compute another evaluation metric for our test dataset
r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

# print("r2: {0:.2f}".format(r2))

In [22]:
# Now let's use dtModel to compute an evaluation metric for our test dataset: testSetDF
predictionsAndLabelsDF = dtModel.transform(testSetDF)

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseDT = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2DT = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName:"r2"})

In [23]:
# Now let's use rfModel to compute an evaluation metric for our test dataset: testSetDF
predictionsAndLabelsDF = rfModel.transform(testSetDF)

# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
rmseRF = regEval.evaluate(predictionsAndLabelsDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2RF = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName:"r2"})

print("LR Root Mean Squared Error: {0:.2f}".format(rmse))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("RF Root Mean Squared Error: {0:.2f}".format(rmseRF))
print("LR r2: {0:.2f}".format(r2))
print("DT r2: {0:.2f}".format(r2DT))
print("RF r2: {0:.2f}".format(r2RF))

In [24]:
sqlContext.sql("DROP TABLE IF EXISTS Power_Plant_RMSE_Evaluation")
dbutils.fs.rm("dbfs:/user/hive/warehouse/Power_Plant_RMSE_Evaluation", True)

# Next we calculate the residual error and divide it by the RMSE
predictionsAndLabelsDF.selectExpr("price", "Predicted_price", "price - Predicted_price Residual_Error", "(price - Predicted_price) / {} Within_RSME".format(rmseRF)).registerTempTable("Cars_RF_RMSE_Evaluation")

In [25]:
%sql
SELECT case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1
            when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3
       end RSME_Multiple, COUNT(*) AS count
FROM Cars_RF_RMSE_Evaluation
GROUP BY case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1  when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end