In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import RegressionEvaluator
import ibmos2spark

def evaluate_model(prediction):
    eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
    rmse = eval.evaluate(prediction)
    print("RMSE: %.3f" % rmse)
    r2 = eval.evaluate(prediction, {eval.metricName: "r2"})
    print("r2: %.3f" %r2)

def graph_results(test_x, test_y, prediction):
    plt.title("Linear Regression in PySpark")
    plt.plot(test_x,test_y,"ro",markersize=3)
    plt.plot(test_x, pred_y, "b-")
    plt.show()


Παρακάτω, γίνεται load το dataset που χρησιμοποιούμε και υπάρχουν και ορισμένα configurations που πραγματοποιει το IBM Watson Studio στο οποίο γράφτηκε και εγινε το testing αυτό το python script με PySpark.

In [None]:
"""
Used IBM Watson Studio, so the hidden cell contains my individual credentials and configurations, so i have substituted them with 'something'

"""
# @hidden_cell
credentials = {
    'endpoint': 'something',
    'service_id': 'something',
    'iam_service_endpoint': 'something',
    'api_key': 'something'
}
configuration_name = 'something'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('day.csv', 'linearregression-donotdelete-pr-pcl56hqd6jlq8d'))

In [None]:
# select windspeed as feature and temp as label
lr_data = df.select(df.windspeed.cast("float"),df.temp.cast("float"))
features = ["windspeed"]
lr_data = lr_data.withColumnRenamed("temp","label")
#lr_data.printSchema()

# training and testing dataset split
(training, test) = lr_data.randomSplit([.8, .2])

# Assemble in Vector, Scale and create Lin.Reg. model in pipeline stages
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")

lr = LinearRegression(maxIter=10, regParam=.01)

stages = [vectorAssembler, standardScaler, lr]
pipeline = Pipeline(stages=stages)

# train model
model = pipeline.fit(training)

# make predictions on the testing dataset
prediction = model.transform(test)
prediction.show()

# calculate losses
evaluate_model(prediction)

# convert the Spark dataframes to Python arrays
test_x = prediction.select("windspeed").collect()
test_y = prediction.select("label").collect()
pred_y = prediction.select("prediction").collect()

# plot the predicted line with the test points
graph_results(test_x, test_y, prediction)