In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [2]:
spark = SparkSession.builder.config("spark.driver.host", "localhost")
print(spark)

<pyspark.sql.session.SparkSession.Builder object at 0x0000022E78B171C0>


In [3]:
data_spark = SparkSession.builder.appName('DataFrame1').getOrCreate()

In [4]:
Spark_df = data_spark.read.option('header','true').csv('Bike Sharing/day.csv', inferSchema = True)

In [5]:
#Droping Column Instant
New_data = Spark_df.drop('instant','dteday','registered')
New_data.show()

+------+---+----+-------+-------+----------+----------+---------+--------+-------+---------+------+----+
|season| yr|mnth|holiday|weekday|workingday|weathersit|     temp|   atemp|    hum|windspeed|casual| cnt|
+------+---+----+-------+-------+----------+----------+---------+--------+-------+---------+------+----+
|     1|  0|   1|      0|      6|         0|         2|14.110847|18.18125|80.5833|10.749882|   331| 985|
|     1|  0|   1|      0|      0|         0|         2|14.902598|17.68695|69.6087|16.652113|   131| 801|
|     1|  0|   1|      0|      1|         1|         1| 8.050924| 9.47025|43.7273|16.636703|   120|1349|
|     1|  0|   1|      0|      2|         1|         1|      8.2| 10.6061|59.0435|10.739832|   108|1562|
|     1|  0|   1|      0|      3|         1|         1| 9.305237| 11.4635|43.6957|  12.5223|    82|1600|
|     1|  0|   1|      0|      4|         1|         1| 8.378268|11.66045|51.8261|6.0008684|    88|1606|
|     1|  0|   1|      0|      5|         1|         2|

In [7]:
New_data.describe().show()

+-------+------------------+------------------+-----------------+--------------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|summary|            season|                yr|             mnth|             holiday|           weekday|         workingday|        weathersit|             temp|            atemp|               hum|         windspeed|           casual|               cnt|
+-------+------------------+------------------+-----------------+--------------------+------------------+-------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+
|  count|               730|               730|              730|                 730|               730|                730|               730|              730|              730|               730|               730|              

In [8]:
categorical_cols = [item[0] for item in New_data.dtypes if item[1].startswith('string')]
print(categorical_cols)

numerical_cols = [item[0] for item in New_data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

[]
['season', 'yr', 'mnth', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual']


In [9]:
import seaborn as sns

In [10]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
import pandas as pd

In [11]:
numerical_cols = [item[0] for item in New_data.dtypes if item[1].startswith('int') | item[1].startswith('double')][:-1]
print(numerical_cols)

['season', 'yr', 'mnth', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed', 'casual']


In [12]:
stages = []
Vectassembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
stages += [Vectassembler]

In [13]:
cols = New_data.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(New_data)
New_data = pipelineModel.transform(New_data)
selectedCols = ['features']+cols
New_data = New_data.select(selectedCols)

In [20]:
New_data = New_data.select(selectedCols)

In [21]:
#pd.DataFrame(New_data.take(5), columns=New_data.columns)

In [22]:
finalized_data = New_data.select("features","cnt")

finalized_data.show()

+--------------------+----+
|            features| cnt|
+--------------------+----+
|[1.0,0.0,1.0,0.0,...| 985|
|[1.0,0.0,1.0,0.0,...| 801|
|[1.0,0.0,1.0,0.0,...|1349|
|[1.0,0.0,1.0,0.0,...|1562|
|[1.0,0.0,1.0,0.0,...|1600|
|[1.0,0.0,1.0,0.0,...|1606|
|[1.0,0.0,1.0,0.0,...|1510|
|[1.0,0.0,1.0,0.0,...| 959|
|[1.0,0.0,1.0,0.0,...| 822|
|[1.0,0.0,1.0,0.0,...|1321|
|[1.0,0.0,1.0,0.0,...|1263|
|[1.0,0.0,1.0,0.0,...|1162|
|[1.0,0.0,1.0,0.0,...|1406|
|[1.0,0.0,1.0,0.0,...|1421|
|[1.0,0.0,1.0,0.0,...|1248|
|[1.0,0.0,1.0,0.0,...|1204|
|[1.0,0.0,1.0,1.0,...|1000|
|[1.0,0.0,1.0,0.0,...| 683|
|[1.0,0.0,1.0,0.0,...|1650|
|[1.0,0.0,1.0,0.0,...|1927|
+--------------------+----+
only showing top 20 rows



In [23]:
#Split the data into training and test model with 70% obs. going in training and 30% in testing
train_dataset, test_dataset = finalized_data.randomSplit([0.7, 0.3])

In [24]:
#Import packeages for model creation
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor

In [25]:
#Create the Multiple Linear Regression object named MLR having feature column as features and Label column as Profit
Multiple_linear_Reg = LinearRegression(featuresCol="features", labelCol="cnt")

In [26]:
#Train the model on the training using fit() method.
model1 = Multiple_linear_Reg.fit(train_dataset)
#Predict the Profit on Test Dataset using the evulate method
Mult_pred = model1.evaluate(test_dataset)
#Show the predicted Grade values along sid

In [27]:
#Show the predicted Grade values along side actual Grade values
Mult_pred.predictions.show()

+--------------------+----+-------------------+
|            features| cnt|         prediction|
+--------------------+----+-------------------+
|[1.0,0.0,1.0,0.0,...| 822|0.21016579982364192|
|[1.0,0.0,1.0,0.0,...|1204|  906.7998125815807|
|[1.0,0.0,1.0,0.0,...|1416| 1815.9758433187585|
|[1.0,0.0,1.0,0.0,...|1562|  2078.869183642138|
|[1.0,0.0,1.0,0.0,...|1600|  2138.232584634568|
|[1.0,0.0,1.0,0.0,...| 506|  669.3328260679061|
|[1.0,0.0,1.0,0.0,...|1606|  2351.667100972833|
|[1.0,0.0,1.0,0.0,...| 981|  553.6135761441137|
|[1.0,0.0,1.0,0.0,...| 959| 30.776996820588067|
|[1.0,0.0,1.0,0.0,...|1248|  715.6067761540442|
|[1.0,0.0,1.0,0.0,...| 985|  1059.314721160033|
|[1.0,0.0,2.0,0.0,...|1812| 1564.1215901673536|
|[1.0,0.0,2.0,0.0,...|1589| 1279.0162006087014|
|[1.0,0.0,2.0,0.0,...|2402|  1895.535959585269|
|[1.0,0.0,2.0,0.0,...|1712|  2434.925731453196|
|[1.0,0.0,2.0,0.0,...|1360| 1669.3715216105604|
|[1.0,0.0,2.0,0.0,...|2115| 2472.3837151729845|
|[1.0,0.0,2.0,0.0,...|1605| 1360.8844799

In [28]:
#Find out coefficient value
coefficient = model1.coefficients
print ("The coefficients of the model are : %a" %coefficient)

The coefficients of the model are : DenseVector([400.4065, 1528.7118, -16.6899, -237.531, 36.1624, 1369.2868, -464.9519, -50.308, 96.7123, -4.8321, -21.7367, 1.5206])


In [29]:
#Find out intercept Value
intercept = model1.intercept
print ("The Intercept of the model is : %f" %intercept)

The Intercept of the model is : 459.808928


In [30]:
#Evaluate the model using metric like Mean Absolute Error(MAE), Root Mean Square Error(RMSE) and R-Square
from pyspark.ml.evaluation import RegressionEvaluator
evaluation = RegressionEvaluator(labelCol="cnt", predictionCol="prediction")

# r2 - coefficient of determination
r2 = evaluation.evaluate(Mult_pred.predictions, {evaluation.metricName: "r2"})
print("r2: %.3f" %r2)

r2: 0.903


In [31]:
# Summarize the model over the training set and print out some metrics
trainingSummary = model1.summary

In [32]:
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 649.488215


In [34]:
#Create Unlabeled dataset  to contain only feature column
unlabeled_dataset = test_dataset.select('features')

In [35]:
#Predict the model output for fresh & unseen test data using transform() method
new_predictions = model1.transform(unlabeled_dataset)

In [36]:
#Display the new prediction values
#new_predictions.show()

In [37]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [38]:
#Train the model on the training using fit() method.
model1 = Multiple_linear_Reg.fit(train_dataset)
#Predict the Profit on Test Dataset using the evulate method
Mult_pred = model1.evaluate(test_dataset)
#Show the predicted Grade values along sid

In [39]:
# Create initial Decision Tree Model
decision_tree = DecisionTreeRegressor(labelCol="cnt").fit(train_dataset)

In [40]:
decision_tree_prediction = decision_tree.transform(test_dataset)

In [41]:
decision_tree_prediction.show()

+--------------------+----+------------------+
|            features| cnt|        prediction|
+--------------------+----+------------------+
|[1.0,0.0,1.0,0.0,...| 822|            1413.0|
|[1.0,0.0,1.0,0.0,...|1204|2481.1428571428573|
|[1.0,0.0,1.0,0.0,...|1416|2250.2820512820513|
|[1.0,0.0,1.0,0.0,...|1562|2250.2820512820513|
|[1.0,0.0,1.0,0.0,...|1600|2250.2820512820513|
|[1.0,0.0,1.0,0.0,...| 506|             886.0|
|[1.0,0.0,1.0,0.0,...|1606|2250.2820512820513|
|[1.0,0.0,1.0,0.0,...| 981|1066.6666666666667|
|[1.0,0.0,1.0,0.0,...| 959|1066.6666666666667|
|[1.0,0.0,1.0,0.0,...|1248|          1844.375|
|[1.0,0.0,1.0,0.0,...| 985|          1844.375|
|[1.0,0.0,2.0,0.0,...|1812|2481.1428571428573|
|[1.0,0.0,2.0,0.0,...|1589|2481.1428571428573|
|[1.0,0.0,2.0,0.0,...|2402|          1844.375|
|[1.0,0.0,2.0,0.0,...|1712|2250.2820512820513|
|[1.0,0.0,2.0,0.0,...|1360|             886.0|
|[1.0,0.0,2.0,0.0,...|2115|2262.6428571428573|
|[1.0,0.0,2.0,0.0,...|1605|            1413.0|
|[1.0,0.0,2.0

In [42]:
# Select (prediction, true label) and compute test error
decision_tree_evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")
decision_tree_rmse = decision_tree_evaluator.evaluate(decision_tree_prediction)
print("Root Mean Squared Error (RMSE) on test data = %g" % decision_tree_rmse)

Root Mean Squared Error (RMSE) on test data = 891.329


In [43]:
# r2 - coefficient of determination
dec_tree_r2 = evaluation.evaluate(decision_tree_prediction, {evaluation.metricName: "r2"})
print("r2: %.3f" %dec_tree_r2)

r2: 0.797


In [44]:
from numpy import allclose
from pyspark.ml.linalg import Vectors

In [45]:
from pyspark.ml.regression import RandomForestRegressor
random_forest = RandomForestRegressor(labelCol="cnt").fit(train_dataset)

In [46]:
random_forest_prediction = random_forest.transform(test_dataset)

In [47]:
random_forest_prediction.show()

+--------------------+----+------------------+
|            features| cnt|        prediction|
+--------------------+----+------------------+
|[1.0,0.0,1.0,0.0,...| 822|1302.2716658682887|
|[1.0,0.0,1.0,0.0,...|1204|1969.2715843202036|
|[1.0,0.0,1.0,0.0,...|1416|1627.7546990482274|
|[1.0,0.0,1.0,0.0,...|1562|1604.1364326037185|
|[1.0,0.0,1.0,0.0,...|1600|1675.4307307942588|
|[1.0,0.0,1.0,0.0,...| 506|1064.2685723696593|
|[1.0,0.0,1.0,0.0,...|1606|1583.8761642496925|
|[1.0,0.0,1.0,0.0,...| 981|   1588.4701648496|
|[1.0,0.0,1.0,0.0,...| 959|1366.8680108774458|
|[1.0,0.0,1.0,0.0,...|1248|2041.1861142347334|
|[1.0,0.0,1.0,0.0,...| 985|  2149.30192941908|
|[1.0,0.0,2.0,0.0,...|1812| 2187.761227175467|
|[1.0,0.0,2.0,0.0,...|1589|2105.2967512789833|
|[1.0,0.0,2.0,0.0,...|2402|2384.9558074389374|
|[1.0,0.0,2.0,0.0,...|1712| 1850.939187419987|
|[1.0,0.0,2.0,0.0,...|1360|1318.2843132442044|
|[1.0,0.0,2.0,0.0,...|2115| 2097.575911397372|
|[1.0,0.0,2.0,0.0,...|1605|1471.0241330870601|
|[1.0,0.0,2.0

In [48]:
# Select (prediction, true label) and compute test error
random_forest_evaluator = RegressionEvaluator(labelCol="cnt", predictionCol="prediction", metricName="rmse")
random_forest_rmse = random_forest_evaluator.evaluate(random_forest_prediction)
print("Root Mean Squared Error (RMSE) on test data = %g" % random_forest_rmse)

Root Mean Squared Error (RMSE) on test data = 662.393


In [49]:
# r2 - coefficient of determination
rand_forest_r2 = evaluation.evaluate(random_forest_prediction, {evaluation.metricName: "r2"})
print("r2: %.3f" %rand_forest_r2)

r2: 0.888
