In [0]:
train = spark.read.csv('/FileStore/tables/train_finally.csv', header="true", inferSchema="true")
test = spark.read.csv('/FileStore/tables/test_finally.csv', header="true", inferSchema="true")

In [0]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
display(train)

Electricity,ChilledWater,Steam,HotWater,building_id,meter_reading,site_id,primary_use,square_feet,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed,hour,day,weekday,month
1,0,0,0,0,0.0,0,0,8.913684824725294,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,1,0.0,0,0,7.908754738783246,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,2,0.0,0,0,8.589885876809678,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,3,0.0,0,0,10.07263943528144,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,4,0.0,0,0,11.666573161179576,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,5,0.0,0,0,8.987321812850125,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,6,0.0,0,4,10.2373492417846,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,7,0.0,0,0,11.70416546727143,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,8,0.0,0,0,11.015509528114157,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1
1,0,0,0,9,0.0,0,6,10.203629181337648,25.0,6.0,20.0,0.0,1019.7,0.0,0.0,0,1,4,1


In [0]:
numerical_cols = train.columns

In [0]:
numerical_cols.remove('meter_reading')
numerical_cols

In [0]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = numerical_cols, outputCol = 'features')
train_df = vectorAssembler.transform(train)
train_df = train_df.select(['features','meter_reading'])

In [0]:
train_df.show()

In [0]:
train_data, test_data = train_df.randomSplit([0.7, 0.3])

# Linear Regression

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='meter_reading', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [0]:
pred = lr_model.evaluate(test_data)
pred.predictions.show(5)

In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
test_col = test.columns
test_col.remove('row_id')

In [0]:
vectorAssembler = VectorAssembler(inputCols = test_col, outputCol = 'features')
eva_df = vectorAssembler.transform(test)
eva_df = train_df.select(['features'])

In [0]:
new_predictions = lr_model.transform(eva_df)
new_predictions.show(5)

# Decision Tree

In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

In [0]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'meter_reading')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(labelCol="meter_reading", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [0]:
dt_predictions.show(5)

# Random Forest

In [0]:
from pyspark.ml.regression  import RandomForestRegressor

rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'meter_reading')
rfModel = rf.fit(train_data)
predictions = rfModel.transform(test_data)
rmse = dt_evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [0]:
predictions.show(5)

# Boosted Decision Tree

In [0]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'meter_reading',maxIter = 10)
gbt_model = gbt.fit(train_data)
gbt_predictions = gbt_model.transform(test_data)
rmse = dt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [0]:
gbt_predictions.show(5)

In [0]:
#sample = spark.read.csv('/FileStore/tables/sample_submission.csv', header="true", inferSchema="true")

In [0]:
#sampleLR = sample
#sampleDT = sample
#sampleRF = sample
#sampleGBT = sample

In [0]:
#display(sampleLR)

In [0]:
#sampleLR_pd = sampleLR.toPandas()
#new_predictions_pd = new_predictions.select('prediction').toPandas()
#sampleLR_pd['meter_reading'] = new_predictions_pd['prediction']
#sampleLR_pd.head()

In [0]:
#sampleLR_pd.to_csv('/dbfs/FileStore/tables/sampleLR.csv')

In [0]:
#new_predictions_DT = dt_model.transform(eva_df)
#sampleDT_pd = sampleDT.toPandas()
#new_predictions_DT_pd = new_predictions_DT.select('prediction').toPandas()
#sampleDT_pd['meter_reading'] = new_predictions_DT_pd['prediction']
#sampleDT_pd.to_csv('/dbfs/FileStore/tables/sampleDT.csv')

In [0]:
#new_predictions_RF = rfModel.transform(eva_df)
#sampleRF_pd = sampleRF.toPandas()
#new_predictions_RF_pd = new_predictions_RF.select('prediction').toPandas()
#sampleRF_pd['meter_reading'] = new_predictions_RF_pd['prediction']
#sampleRF_pd.to_csv('/dbfs/FileStore/tables/sampleRF.csv')

In [0]:
#new_predictions_GBT = gbt_model.transform(eva_df)
#sampleGBT_pd = sampleGBT.toPandas()
#new_predictions_GBT_pd = new_predictions_GBT.select('prediction').toPandas()
#sampleGBT_pd['meter_reading'] = new_predictions_GBT_pd['prediction']
#sampleGBT_pd.to_csv('/dbfs/FileStore/tables/sampleGBT.csv')