In [0]:
from pyspark.sql import SparkSession 
sc = SparkSession.builder.master("local[*]").getOrCreate()


In [0]:
data = sc.read.csv("/FileStore/tables/Bike_Rental_UCI_dataset-bb6c6.csv",inferSchema=True,header=True)

In [0]:
data.show()

In [0]:
from pyspark.ml.classification import (LogisticRegression,DecisionTreeClassifier,RandomForestClassifier)
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

In [0]:
indexer = StringIndexer(inputCol="dayOfWeek",outputCol="day_cat")

In [0]:
indexed_data = indexer.fit(data).transform(data)
indexed_data.show()

In [0]:
indexed_data.select('day_cat').distinct().orderBy('day_cat').show()

In [0]:
from pyspark.ml.feature import VectorAssembler

vec = VectorAssembler(inputCols=['season','yr','mnth','hr','holiday','workingday','weathersit','days','day_cat'],outputCol='features')

In [0]:
data1 = vec.transform(indexed_data)

In [0]:
data1.show()

In [0]:
modelData = data1.select('features','demand')

In [0]:
modelData.describe().show()

In [0]:
modelData.show(truncate=False)

In [0]:
trainData,testData = modelData.randomSplit([0.7,0.3])
trainData = trainData.withColumnRenamed(('demand'),('label'))
testData = testData.withColumnRenamed(('demand'),('label'))
trainData.show()

In [0]:
testData.show(truncate=False)

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
lr = LinearRegression(featuresCol='features',labelCol='label',maxIter=10,regParam=0.3,elasticNetParam=1)
lr_model = lr.fit(trainData)
print("Coefficient: " + str(lr_model.coefficients))
print("Intercept: "  + str(lr_model.intercept))
trainingSummary = lr_model.summary
print("RMSE : %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
#Make predictions
predictions = lr_model.transform(testData)

#Select example rows to display 
predictions.select("prediction","label","features").show(5)

In [0]:
#select (prediction,true label) and compute the test error
evaluator1 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator1.evaluate(predictions)
print("Root Mean Square Error (RMSE) on test data = %g" % rmse)
evaluator2 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="r2")
r2 = evaluator2.evaluate(predictions)
print("R2 on test data = %g" %r2)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [0]:
evaluator = RegressionEvaluator(metricName='r2')
evaluator.explainParam('metricName')

In [0]:
#%pip install mlflow

In [0]:
pipeline =Pipeline(stages=[lr])
gridBuilder = ParamGridBuilder().addGrid(lr.regParam,[0.1,0.01]).build()
cv = CrossValidator(estimator=pipeline,estimatorParamMaps=gridBuilder,evaluator=evaluator,numFolds=2)
cvm = cv.fit(trainData)
predictions = cvm.transform(testData)
evaluator.evaluate(predictions)


In [0]:
data1.groupby('season').mean('demand').show()

In [0]:
data1.groupby('hr').mean('demand').show()

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
#train a Random Forest 
rf = RandomForestRegressor(featuresCol='features')
#Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[rf])
#train model. this is also runs the indexer
model = pipeline.fit(trainData)
#Make predictions
predictions = model.transform(testData)
#select example row to display 
predictions.select("prediction","label","features").show(5)
#select (prediction,true label) and compute the test error
evaluator1 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator1.evaluate(predictions)
print("Root Mean Square Error (RMSE) on test data = %g" % rmse)
evaluator2 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="r2")
r2 = evaluator2.evaluate(predictions)
print("R2 on test data = %g" % r2)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Train a GBT model
gbt = GBTRegressor(featuresCol='features',maxIter=50)

#chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[gbt])
#train model. this is also runs the indexer
model = pipeline.fit(trainData)
#Make predictions
predictions = model.transform(testData) 
#select example row to display 
predictions.select("prediction","label","features").show(5)
#select (prediction,true label) and compute the test error
evaluator1 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="rmse")
rmse = evaluator1.evaluate(predictions)
print("Root Mean Square Error (RMSE) on test data = %g" % rmse)
evaluator2 = RegressionEvaluator(labelCol="label",predictionCol="prediction",metricName="r2")
r2 = evaluator2.evaluate(predictions)
print("R2 on test data = %g" % r2)