In [1]:
#Read dataset in Spark
df = sqlContext.read.load("dbfs:/databricks-datasets/bikeSharing/data-001/day.csv", 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
#This is databrick dataset of bike sharing, have done project with given dataset though

In [2]:
#2. Get summary of data and variable types
df.printSchema()

In [3]:
df.show(5)

In [4]:
#Given Train file from which data frame is generated 
bs_df = spark.sql("select * from bike_sharing_train_csv")
bs_df.show(5)

In [5]:
bs_df.printSchema()

In [6]:
bs_df.describe().show()

In [7]:
bs_df.explain()

In [8]:
#Check for any missing value in dataset and treat it
print(bs_df.count())
df_no_null = bs_df.na.drop()
print(df_no_null.count())

In [9]:
#Check what are the distinct seasons present to explode them
bs_df.select('season').distinct().show()

In [10]:
#user defined function to help creat new columns
def valueToCategory(value, encoding_index):
   if(value == encoding_index):
      return 1
   else:
    return 0

In [11]:
#Explode season column into separate columns such as season_<val> and drop season
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import col
udfValueToCategory = udf(valueToCategory, IntegerType())
bs_df_encoded = (bs_df.withColumn("season_1", udfValueToCategory(col('season'),lit(1)))
                     .withColumn("season_2", udfValueToCategory(col('season'),lit(2)))
                     .withColumn("season_3", udfValueToCategory(col('season'),lit(3)))
                     .withColumn("season_4", udfValueToCategory(col('season'),lit(4))))
bs_df_encoded = bs_df_encoded.drop('season')

In [12]:
bs_df_encoded.show()

In [13]:
#Execute the same for weather as weather_<val> and drop weather
bs_df.select('weather').distinct().show()

In [14]:
bs_df_encoded = (bs_df_encoded.withColumn("weather_1", udfValueToCategory(col('weather'),lit(1)))
                     .withColumn("weather_2", udfValueToCategory(col('weather'),lit(2)))
                     .withColumn("weather_3", udfValueToCategory(col('weather'),lit(3)))
                     .withColumn("weather_4", udfValueToCategory(col('weather'),lit(4))))
bs_df_encoded = bs_df_encoded.drop('weather')

In [15]:
bs_df_encoded.show()

In [16]:
# Split datetime into meaningful columns such as hour,day,month,year,etc
from pyspark.sql.functions import split
from pyspark.sql.functions import *
from pyspark.sql.types import *
bs_df_encoded = bs_df_encoded.withColumn('hour',  split(split(bs_df_encoded['datetime'], ' ')[1], ':')[0].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('month', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[0].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('day', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[1].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('year', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[2].cast('int'))

In [17]:
display(bs_df_encoded)

datetime,holiday,workingday,temp,atemp,humidity,windspeed,casual,registered,count,season_1,season_2,season_3,season_4,weather_1,weather_2,weather_3,weather_4,hour,month,day,year
01-01-2011 00:00,0,0,9.84,14.395,81,0.0,3,13,16,1,0,0,0,1,0,0,0,0,1,1,2011
01-01-2011 01:00,0,0,9.02,13.635,80,0.0,8,32,40,1,0,0,0,1,0,0,0,1,1,1,2011
01-01-2011 02:00,0,0,9.02,13.635,80,0.0,5,27,32,1,0,0,0,1,0,0,0,2,1,1,2011
01-01-2011 03:00,0,0,9.84,14.395,75,0.0,3,10,13,1,0,0,0,1,0,0,0,3,1,1,2011
01-01-2011 04:00,0,0,9.84,14.395,75,0.0,0,1,1,1,0,0,0,1,0,0,0,4,1,1,2011
01-01-2011 05:00,0,0,9.84,12.88,75,6.0032,0,1,1,1,0,0,0,0,1,0,0,5,1,1,2011
01-01-2011 06:00,0,0,9.02,13.635,80,0.0,2,0,2,1,0,0,0,1,0,0,0,6,1,1,2011
01-01-2011 07:00,0,0,8.2,12.88,86,0.0,1,2,3,1,0,0,0,1,0,0,0,7,1,1,2011
01-01-2011 08:00,0,0,9.84,14.395,75,0.0,1,7,8,1,0,0,0,1,0,0,0,8,1,1,2011
01-01-2011 09:00,0,0,13.12,17.425,76,0.0,8,6,14,1,0,0,0,1,0,0,0,9,1,1,2011


In [18]:

bs_df_encoded.printSchema()
bs_df_encoded = bs_df_encoded.drop('datetime')
bs_df_encoded = bs_df_encoded.withColumnRenamed("count", "label")

In [19]:
#Explore how count varies with hour
import seaborn as sns
sns.set(style="ticks")
viz = bs_df_encoded.select('label', 'hour').distinct().limit(50)

display(viz)
#we can see from below which hours have more density

label,hour
61,13
179,8
79,10
154,13
77,23
15,14
155,14
250,20
92,8
376,15


In [20]:
#Explore how count varies with different features such as hour,month,etc#
import seaborn as sns
sns.set(style="ticks")
viz = bs_df_encoded.select('label', 'day').distinct().limit(50)
#sns.pairplot(viz, hue="species")
#plt.show()
display(viz)

label,day
92,2
203,4
71,4
214,7
43,7
163,7
179,8
137,11
83,11
307,2


In [21]:
#Split the dataset into train and train_test
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
train, test = bs_df_encoded.randomSplit([0.9, 0.1], seed=12345)

In [22]:
#The features are assembled to send it to model
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["holiday","workingday","temp","atemp","humidity","windspeed","casual","registered","label","season_1","season_2","season_3","season_4","weather_1","weather_2","weather_3","weather_4", "hour", "month", "day", "year"],
    outputCol="features")

output = assembler.transform(train)
print("Assembled columns 'hour', 'day' etc  to vector column 'features'")
output.show(truncate=False)
print(output.count())
train_output = output.na.drop()
print(train_output.count())

In [23]:
test_output = assembler.transform(test)
print(test_output.count())
train_output = test_output.na.drop()
print(test_output.count())
print("Assembled columns 'hour', 'day' etc  to vector column 'features'")
test_output.show(truncate=False)#.select("features", "clicked")

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)

# Fit the model
lrModel = lr.fit(train_output)

In [25]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

In [26]:
import pyspark.sql.functions
predictions = lrModel.transform(test_output)\
    .select("features", "label", "prediction")\
    .show(1000)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
# testRDD = test.rdd 
# predictionAndLabels = testRDD.map(lambda lp: (float(model.predict(lp.features)), lp.label))
# # Evaluate model
# metrics = BinaryClassificationMetrics(predictionAndLabels)
# f1Score = metrics.fMeasure()
# print(f1Score)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="r2")
# print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))


In [27]:
# Parameter grid search for best parameters to give good predictions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train_output)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test_output)\
    .select("features", "label", "prediction")\
    .show()

In [28]:
# Random Forest Classifier model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=100)
# Train model.  This also runs the indexers.
rf_model = rf.fit(train_output)
# rf_model.persist()
# Make predictions.
predictions = rf_model.transform(test_output)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [29]:
# GBT Regressor model 
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", maxIter=10)

gbt_model = gbt.fit(train_output)
# Make predictions.
predictions = gbt_model.transform(test_output)


gbt_model.write().overwrite().save("bike_sharing_gbt.model")
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
#Gave root mean square error 
