In [None]:
# Step #6 Simple Regression Example with MLlib
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import FeatureHasher
from pyspark.ml.feature import VectorAssembler
data = spark_weather_df.filter(spark_weather_df.date < "2021-01-01")
data.limit(1).toPandas()

In [None]:
target = "avg_temp"
features = ["year", "month"]

vector = VectorAssembler(inputCols=features, outputCol='features')

# Split the data into training and test sets (20% held out for testing)
splitdate = "2018-01-01"
train = data.filter(col("date") < splitdate)
test = data.filter(col("date") >= splitdate)

# Train a GBT model.
gbt = GBTRegressor(featuresCol="features", labelCol=target, maxIter=10) # we use avg_temp as our target_label

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[vector, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(train)



In [None]:
mdata = model.transform(test)
# mdata.show(3)
mdata.select(col("month"), col("prediction"), col(target)).filter(col("month") == 8).show(50)
 
rmse=RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
rmse=rmse.evaluate(mdata) 
 
mae=RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="mae")
mae=mae.evaluate(mdata) 
 
r2=RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
r2=r2.evaluate(mdata)

print("RMSE: ", rmse)
print("MAE: ", mae)
print("R-squared: ", r2)

In [None]:
fig, ax = plt.subplots(figsize=(28,8))

x_ax = mdata.select(col("month"), col("year")).collect() #range(0, mdata.count())
y_pred=mdata.select("prediction").collect()
y_orig=mdata.select(target).collect()

plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show() 