In [0]:
# About Dataset
# Feature Engineering
# Correlation Analysis
# Visualizing Distribution Of Data
# Filling 0's In Windspeed Using Random Forest
# Regularized Linear Regression Model
# Ensemble Models

In [0]:
# Overview
# Bike sharing systems are a means of renting bicycles where the process of obtaining membership, rental, and bike return is automated via a network of kiosk locations throughout a city. Using these systems, people are able rent a bike from a one location and return it to a different place on an as-needed basis. Currently, there are over 500 bike-sharing programs around the world.

# datetime - hourly date + timestamp

# season - 1 = spring, 2 = summer, 3 = fall, 4 = winter

# holiday - whether the day is considered a holiday

# workingday - whether the day is neither a weekend nor holiday

# weather -
# 1: Clear, Few clouds, Partly cloudy, Partly cloudy
# 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
# 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
# 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog

# temp - temperature in Celsius

# atemp - "feels like" temperature in Celsius

# humidity - relative humidity

# windspeed - wind speed

# casual - number of non-registered user rentals initiated

# registered - number of registered user rentals initiated

# count - number of total rentals (Dependent Variable)

In [0]:
from pyspark import SparkContext, SparkConf
sc

In [0]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler,StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.regression import RandomForestRegressor,GeneralizedLinearRegression
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark = SparkSession.builder.appName('BIKESHARING').getOrCreate()
spark = SparkSession.builder.master("local").appName("BikeSharingDemand").getOrCreate()

In [0]:
df_train=spark.read.csv('/FileStore/tables/train.csv',inferSchema=True,header=True)
df_train_backup=spark.read.csv('/FileStore/tables/train.csv',inferSchema=True,header=True)
df_test=spark.read.csv('/FileStore/tables/test.csv',inferSchema=True,header=True)

In [0]:
df_train.printSchema()

In [0]:
# Derive columns date, hour, weekday
df_train = df_train.withColumn('date',to_date(df_train.datetime)).\
                      withColumn('hour',hour(df_train.datetime)).\
                      withColumn('dayofweek',date_format(df_train.datetime, 'E')).\
                      withColumn('month',date_format(df_train.datetime, 'L')).\
                      withColumn('month',col('month').cast("int")).\
                      withColumn('year', date_format(df_train.datetime, "Y"))


df_train = df_train.withColumn('weather',when(col('weather') == '1','Clear + Few clouds + Partly cloudy')
                                          .when(col('weather') == '2','Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist')
                                          .when(col('weather') == '3','Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds')
                                          .otherwise('Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog'))

df_train = df_train. withColumn('season_text',when(col('season') == '1','Spring')
                                         .when(col('season') == '2','Summer')
                                         .when(col('season') == '3','Fall')
                                         .otherwise('Winter'))

In [0]:
columns = ["temp","atemp","casual","registered","humidity","windspeed","count"]
 
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=["temp","atemp","casual","registered","humidity","windspeed","count"], 
                            outputCol=vector_col)
myGraph_vector = assembler.transform(df_train).select(vector_col)
matrix = Correlation.corr(myGraph_vector, vector_col)
matrix = Correlation.corr(myGraph_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
df = spark.createDataFrame(corrmatrix,columns)
df.show()

# temp and humidity features has got positive and negative correlation with count respectively.Although the correlation between them are not very prominent still the count variable has got little dependency on "temp" and "humidity".

# windspeed is not gonna be really useful numerical feature and it is visible from it correlation value with "count"

# "atemp" is variable is not taken into since "atemp" and "temp" has got strong correlation with each other. During model building any one of the variable has to be dropped since they will exhibit multicollinearity in the data.

# "Casual" and "Registered" are also not taken into account since they are leakage variables in nature and need to dropped during model building.

In [0]:
df_train.select('count').display()

count
16
40
32
13
1
1
2
3
8
14


In [0]:
display(df_train.groupBy('month').agg(sum('casual').alias('casual_count'),sum('registered').alias('registered_count')).sort('month'))
# From the below plot we can see that for all the 12 months, number of registered bike rentals are very high compared to casual bike rentals. We have the least bike rentals in the month of January "79884" and maximum in the month of June "220733". Overall, bike rentals increased gradually until June and then there is a slight decrease in the further months

month,casual_count,registered_count
1,7252,72632
2,9297,89816
3,25056,108445
4,39813,127589
5,41285,158862
6,48574,172159
7,50947,163670
8,45870,167646
9,45901,166628
10,38087,169347


In [0]:
df_train.select('temp','count','holiday').display()
# This plot shows the variation of bike rentals at different temperatures. The rentals are very less at the extreme temperatures (very high and very low). The rentals are high when the temperature is between 25 - 30 degree celsius.

temp,count,holiday
9.84,16,0
9.02,40,0
9.02,32,0
9.84,13,0
9.84,1,0
9.84,1,0
9.02,2,0
8.2,3,0
9.84,8,0
13.12,14,0


In [0]:
df_train.select('humidity','count','holiday').display()
# This plot shows the variation of bike rentals at different humidity conditions.The rental count is very high at an humidity value of '46' and it keeps decreasing with slight fluctuations in between with increase in humidity value.

humidity,count,holiday
81,16,0
80,40,0
80,32,0
75,13,0
75,1,0
75,1,0
80,2,0
86,3,0
75,8,0
76,14,0


In [0]:
df_train.select("hour","dayofweek","count").groupby("hour","dayofweek").agg(avg('count').alias('average rentals per hour')).sort(["hour","dayofweek"],ascending = True).display()
# The below plot shows the average rentals per hour for each day. Almost all the days have high rentals at 8:00 AM in the morning and during 5:00 PM in the evening. The rentals are low for all weekdays during the noon hours excluding the weekends, Saturday and Sunday have rental count average around 400 during noon. Apart from noon hours, saturday and sunday have less rentals during the morning and evening hours.

hour,dayofweek,average rentals per hour
0,Fri,53.234375
0,Mon,35.49230769230769
0,Sat,98.21212121212122
0,Sun,96.22727272727272
0,Thu,37.47692307692308
0,Tue,27.328125
0,Wed,36.246153846153845
1,Fri,24.453125
1,Mon,18.07692307692308
1,Sat,70.01515151515152


In [0]:
df_train.select("hour","season_text","count").groupby("hour","season_text").agg(avg('count').alias('average count per hour')).sort(["hour","season_text"],ascending = True).display()
# The above plot shows the average rentals per hour at different seasons. The Fall season has more rentals followed by summer, winter and spring.

hour,season_text,average count per hour
0,Fall,75.67543859649123
0,Spring,28.292035398230087
0,Summer,58.47368421052632
0,Winter,57.87719298245614
1,Fall,44.83185840707964
1,Spring,18.76106194690265
1,Summer,35.64035087719298
1,Winter,36.16666666666666
2,Fall,31.707964601769916
2,Spring,13.205607476635514


In [0]:
df_train.select('weather','count').groupby("weather").agg(avg('count').alias('average count per hour')).display()
# 31% of the rentals are when the climate is clear and partly cloudy.Least rentals are when there is snow, rain and thunderstroms. 25% rentals are during heavy rain and thunderstorms.

weather,average count per hour
"Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds",118.8463329452852
"Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist",178.95553987297106
"Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog",164.0
Clear + Few clouds + Partly cloudy,205.23679087875416


In [0]:
# Three new columns casual,registered and count are created in the test dataset with zeroes in it.
df_test = df_test.withColumn("casual", lit(None))
df_test = df_test.withColumn("registered", lit(None))
df_test = df_test.withColumn("count", lit(None))

#Combining train and test datasets
spdf = df_train_backup.union(df_test)

# convert datatype of datetime 
spdf = spdf.withColumn('datetime', spdf.datetime.cast('timestamp'))

# Derive columns date, hour, weekday
spdf = spdf.withColumn('date',to_date(spdf.datetime)).\
     withColumn('hour',hour(spdf.datetime)).\
     withColumn('dayofweek',date_format(spdf.datetime, 'E')).\
     withColumn('month',date_format(spdf.datetime, 'L')).\
     withColumn('month',col('month').cast("int")).\
     withColumn('year',year(spdf.datetime))


# convert datatype of datetime 
spdf = spdf.withColumnRenamed('count','count_total') #converting the name of dependent variable to overcome the default count function set by pyspark

#converting attributes to respective datatypes
spdf = spdf.withColumn("datetime", spdf.datetime.cast('timestamp')).\
            withColumn('temp', spdf.temp.cast('float')).\
            withColumn('atemp', spdf.atemp.cast('float')).\
            withColumn('humidity', spdf.humidity.cast('int')).\
            withColumn('windspeed', spdf.windspeed.cast('int')).\
            withColumn('casual', spdf.casual.cast('int')).\
            withColumn('registered', spdf.registered.cast('int')).\
            withColumn('count_total', spdf.count_total.cast('int'))

spdf = spdf.drop('datetime')

In [0]:
spdf.display(10)

season,holiday,workingday,weather,temp,atemp,humidity,windspeed,casual,registered,count_total,date,hour,dayofweek,month,year
1,0,0,1,9.84,14.395,81,0,3,13,16,2011-01-01,0,Sat,1,2011
1,0,0,1,9.02,13.635,80,0,8,32,40,2011-01-01,1,Sat,1,2011
1,0,0,1,9.02,13.635,80,0,5,27,32,2011-01-01,2,Sat,1,2011
1,0,0,1,9.84,14.395,75,0,3,10,13,2011-01-01,3,Sat,1,2011
1,0,0,1,9.84,14.395,75,0,0,1,1,2011-01-01,4,Sat,1,2011
1,0,0,2,9.84,12.88,75,6,0,1,1,2011-01-01,5,Sat,1,2011
1,0,0,1,9.02,13.635,80,0,2,0,2,2011-01-01,6,Sat,1,2011
1,0,0,1,8.2,12.88,86,0,1,2,3,2011-01-01,7,Sat,1,2011
1,0,0,1,9.84,14.395,75,0,1,7,8,2011-01-01,8,Sat,1,2011
1,0,0,1,13.12,17.425,76,0,8,6,14,2011-01-01,9,Sat,1,2011


In [0]:
#Printing the number of NA values in each column
spdf.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in spdf.columns)).show()
#The NA values in the three attributes are for the test dataset which do not have the dependent variable value

In [0]:
spdf = spdf.na.fill({'casual': '99999', 'registered': '99999','count_total':'99999'}) 
#Replacing NA with a radom value to make sure the modeling part does not get effected.

#Breaking the windspeed into train and test where test data contains only ZERO and train data contains value in windspeed attribute
wind_zero = spdf.where(spdf.windspeed ==0) # Taking only the zero values of windspeed
wind_non_zero = spdf.where(spdf.windspeed !=0 ) #taking the non-ero rows of windspeed which acts as training dataset

wind_non_zero_backup = wind_non_zero #backup of the data to make sure no mistakes happen
wind_zero_backup = wind_zero

In [0]:
#A standardizing function which takes the names of columns and returns a pipeline that can be used in various stages of model building    
def standardize_attributes(columns_to_scale):
  assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
  scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
  pipeline_standardize = Pipeline(stages=assemblers + scalers)
  return pipeline_standardize

#Calling the standardizing function which creates pipeline for humidity, temp and atemp features that are numeric ones we use for predicting windspeed
pipeline_windspeed = standardize_attributes(["humidity", "temp", "atemp"])
scaler_windspeed = pipeline_windspeed.fit(wind_zero)

wind_zero = scaler_windspeed.transform(wind_zero) #applying the standardizing fn on train and test dataset
wind_non_zero = scaler_windspeed.transform(wind_non_zero)

In [0]:
#The indexer is used to deal with the categorical type of data
season_indexer = StringIndexer(inputCol='season',outputCol='season_index',handleInvalid='keep')
weather_indexer = StringIndexer(inputCol='weather',outputCol='weather_index',handleInvalid='keep')
month_indexer = StringIndexer(inputCol='month',outputCol='month_index',handleInvalid='keep')
year_indexer = StringIndexer(inputCol='year',outputCol='year_index',handleInvalid='keep')


holiday_indexer = StringIndexer(inputCol='holiday',outputCol='holiday_index',handleInvalid='keep')
workingday_indexer = StringIndexer(inputCol='workingday',outputCol='workingday_index',handleInvalid='keep')
hour_indexer = StringIndexer(inputCol='hour',outputCol='hour_index',handleInvalid='keep')

In [0]:
#These are the set of attributes we used to predict the windspeed attributes
assembler = VectorAssembler(inputCols=['humidity_scaled','temp_scaled','atemp_scaled',
                                       'season_index','weather_index','month_index','year_index'],
                            outputCol="features")

# create a RandomForest model to learn the rules for predicting windspeed
rf = RandomForestRegressor(labelCol="windspeed", featuresCol="features", numTrees=10)

#Make a pipeline to convert categorical, assembler and train random forest model
pipe = Pipeline(stages=[season_indexer,weather_indexer,month_indexer,year_indexer,assembler, rf])

#Fitting a pipeline model
windspeed_model = pipe.fit(wind_non_zero)

#Make predictions.
predictions = windspeed_model.transform(wind_zero)


#removing the actual windspeed and replacing the prediction column name as windspeed
predictions = predictions.drop('windspeed')
predictions = predictions.withColumnRenamed('prediction','windspeed')


#joining the train and test dataset to make a final dataset that does not contain 0 in windspeed
wind_zero_imputed =predictions.select('season', 'holiday', 'workingday', 'weather', 'temp', 'atemp', 'humidity', 'casual', 'registered', 'count_total', 'date', 'hour', 'year', 'month','windspeed')

wind_non_zero_backup = wind_non_zero_backup.select('season', 'holiday', 'workingday', 'weather', 'temp', 'atemp', 'humidity', 'casual', 'registered', 'count_total', 'date', 'hour', 'year', 'month','windspeed')

#appending the non zero and zero to a single dataframe
data = wind_non_zero_backup.union(wind_zero_imputed)

In [0]:
#replacing the data back to NA to take the NA records as test dataset

data = data.withColumn("casual",when(data["casual"] == 99999, None).otherwise(data["casual"])).\
             withColumn("registered",when(data['registered'] == 99999, None).otherwise(data['registered'])).\
             withColumn("count_total",when(data['count_total'] == 99999, None).otherwise(data['count_total']))
        

#splitting data on the train and test: The train dataset is further broken down to train and validation
#Note the test data does not contain the dependent values. Hence to evaluate the model we have broken into train and valid
test_data = data.where(data.count_total.isNull())
data = data.filter(data.count_total.isNotNull())

(train_data, valid_data) = data.randomSplit([0.7, 0.3])

print("the number of rows for train dataset",train_data.count())
print("the number of rows for valid dataset",valid_data.count())
print("the number of rows for test dataset",test_data.count())

In [0]:
#setting the attributes needed for training the model
pipeline_count = standardize_attributes(["humidity", "temp", "atemp","windspeed"])
scaler_count = pipeline_count.fit(train_data)

#assembling the categorical and numerical attributes needed
assembler = VectorAssembler(inputCols=['season_index','holiday_index','workingday_index',
                                       'weather_index',
                                       'month_index','year_index', 'hour_index',
                                       'humidity_scaled','temp_scaled','atemp_scaled','windspeed_scaled'],
                            outputCol="features")

assembler_linear_reg = VectorAssembler().setInputCols(['count_total',]).setOutputCol('features')

In [0]:
#fitting a generalized linear regression with alpha- regularization parameter set to 0.3
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3,labelCol='count_total')

#creating a pipeline to fit the data
piped_train_dataset = Pipeline(stages=[holiday_indexer, workingday_indexer,hour_indexer,season_indexer,weather_indexer,month_indexer,year_indexer,assembler])

#applying the standardization and fitting the pipeline with train dataset
train_data_std = scaler_count.transform(train_data)
piped_data = piped_train_dataset.fit(train_data_std)

#transforming the train dataset with pipeline created above
train_data_std=piped_data.transform(train_data_std)

# fitting the generalized linear model on the train dataset
glr_model = glr.fit(train_data_std)

#Applying the transformation on the validation dataset
valid_data_std = scaler_count.transform(valid_data)
valid_data_std=piped_data.transform(valid_data_std)
lr_predictions = glr_model.transform(valid_data_std)

#Since the dependent variable is numeric, RMSE is the best score to understand the performance of the model

evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="count_total",metricName="rmse")
print("The RMSE value on validation dataset is", evaluator.evaluate(lr_predictions))

In [0]:
# Train a RandomForest model.
rf = RandomForestRegressor(labelCol="count_total", featuresCol="features", numTrees=40, maxDepth=5)
count_model = rf.fit(train_data_std)


valid_data_std = scaler_count.transform(valid_data)
valid_data_std=piped_data.transform(valid_data_std)
rf_predictions_train = count_model.transform(train_data_std)
rf_predictions_valid = count_model.transform(valid_data_std)

# evaluator = RegressionEvaluator(labelCol="count_total", predictionCol="prediction", metricName="rmse")
# rmse = evaluator.evaluate(lr_predictions)
print("The RMSE value on train dataset is", evaluator.evaluate(rf_predictions_train))
print("The RMSE value on validation dataset is", evaluator.evaluate(rf_predictions_valid))

In [0]:
# References:
#   https://www.kaggle.com/c/bike-sharing-demand
#   https://spark.apache.org/docs/latest/ml-classification-regression.html
#   https://www.datasciencemadesimple.com/get-month-year-and-quarter-from-date-in-pyspark/