In [1]:
import pyspark
from pyspark.ml.feature import Word2Vec
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.ml import Pipeline, PipelineModel
import os
from pyspark.sql.functions import col

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor,GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

from pyspark.ml.tuning import CrossValidator,TrainValidationSplit,ParamGridBuilder


In [2]:
os.environ['PYSPARK_PYTHON'] = '/home/hduser/anaconda3/bin/python'
# The value is the python command of the version required to start the master and worker in the Linux system
os.environ['PYSPARK_DRIVER_PYTHON'] = r"C:\Users\billy\anaconda3\python.exe"
# The value is the spark directory in the local windows system
os.environ['SPARK_HOME'] = 'C:/spark'
# The value is the local IP, and the IP required to establish a connection, to prevent connection failure when multiple network cards
#os.environ['SPARK_LOCAL_IP'] = '192.168.56.1'
os.environ['HADOOP_HOME'] = "D:/hadoop-3.3.0"
os.environ['HADOOP_CONF_DIR'] = "/usr/local/hadoop/etc/hadoop"

In [3]:
conf  = pyspark.SparkConf().setAppName('BikeSharing1').setMaster('spark://192.168.133.4:7077').set(
    "spark.submit.deployMode","client").set('spark.driver.memory','6g').set(
        'spark.executor.memory', '2g').set('spark.executor.cores', 1).set(
        'spark.network.timeout', 600).set('spark.executor.heartbeatInterval', 120).set(
    'spark.cores.max', 4).set("spark.driver.host","192.168.133.1").set("spark.driver.port","9999").set("spark.sql.execution.arrow.pyspark.enabled", "true")#.set('spark.python.profile','true')
spark = SparkSession.builder.getOrCreate()

In [4]:
path = "hdfs://192.168.133.4:9000/user/hduser/"

In [9]:
class bike_training():

    def __init__(self,path):
        
        # read csv file by sparksql
        self.df = spark.read.format('csv')\
            .option('header','true').load(path+"data/bikes/hour.csv")

        # define the evaluation of the model 
        self.evaluator = RegressionEvaluator(labelCol = 'cnt',
                               predictionCol ='prediction',
                               metricName = 'rmse')
        
        
    def preprocessing(self,split_ratio):
        self.hour_df = self.df.drop("instant").drop("dteday").drop('yr').drop('casual').drop('registered')
        self.hour_df = self.hour_df.select([col(column).cast("double").alias(column)
                     for column in self.hour_df.columns])
        print(self.hour_df.printSchema())
        self.train_df, self.test_df =  self.hour_df.randomSplit(split_ratio)
        
        # to make read faster cache the df in the memory
        self.train_df.cache()
        self.test_df.cache()
        
    
    def pipelines(self):
        
        ## pipeline common steps
        featuresCols = self.hour_df.columns[:-1]
        vectorAssembler = VectorAssembler(inputCols = featuresCols, outputCol='aFeatures')
        vecotrIndexer = VectorIndexer(inputCol='aFeatures',outputCol='features',maxCategories=24)
        
        # Decision Tree models
        dt = DecisionTreeRegressor(labelCol="cnt",featuresCol="features")
        self.dt_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,dt])
        
        # Gradient Boost models
        gbt = GBTRegressor(labelCol='cnt',featuresCol='features')
        self.gbt_cv_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,gbt])
        
        
        # ModelTuning parameters
        self.paramGrid = ParamGridBuilder()\
            .addGrid(dt.maxDepth,[5,10,15,25])\
            .addGrid(dt.maxBins,[25,35,45,50])\
            .build()

        # Decision Tree Cross-Validation        
        cv = CrossValidator(estimator=dt,evaluator=self.evaluator,
                estimatorParamMaps=self.paramGrid)
        self.cv_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,cv])
        
        # Decision Tree Train-Validation
        tvs = TrainValidationSplit(estimator=dt,evaluator=self.evaluator,
                          estimatorParamMaps=self.paramGrid)
        self.tvs_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,tvs])
        
        # Gradient Boost Cross-Validation
        gbt_cv = CrossValidator(estimator=dt,evaluator=self.evaluator,
        estimatorParamMaps=self.paramGrid)
        self.gbt_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,gbt_cv])
        
    def dt_pipeline_ModelTraining(self,pipeline):
        
        # Training the pipeline
        dt_pipeline_Model  = pipeline.fit(self.train_df)
        
        # make a prediction by the model
        predicted_df = dt_pipeline_Model.transform(self.test_df)
        
        # Root mean square of the model
        rmse = self.evaluator.evaluate(predicted_df)
        print('RMSE: {}'.format(rmse))
        return predicted_df.drop('aFeatures').drop('features').show(5),rmse
        
bikeModel = bike_training(path)
bikeModel.preprocessing([0.7,0.3])
bikeModel.pipelines()
print("tvs_pipeline")
bikeModel.dt_pipeline_ModelTraining(bikeModel.tvs_pipeline)
print("dt_cv_pipeline")
bikeModel.dt_pipeline_ModelTraining(bikeModel.cv_pipeline)
print("dt_pipeline")
bikeModel.dt_pipeline_ModelTraining(bikeModel.dt_pipeline)
print("gbt_pipeline")
bikeModel.dt_pipeline_ModelTraining(bikeModel.gbt_pipeline)
print("gbt_cv_pipeline")
bikeModel.dt_pipeline_ModelTraining(bikeModel.gbt_cv_pipeline)
    

root
 |-- season: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)

None
tvs_pipeline
RMSE: 82.33040601248787
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|season|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed| cnt|        prediction|
+------+----+---+-------+-------+----------+----------+----+------+----+---------+----+------------------+
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.04|0.0758|0.57|   0.1045|22.0|              13.0|
|   1.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1364|0.4

(None, 78.40438489328209)

In [11]:
hour_df = spark.read.format('csv')\
            .option('header','true').load(path+"data/bikes/hour.csv")
hour_df.count()

17379

In [None]:
hour_df = hour_df.drop("instant").drop("dteday").drop('yr').drop('casual').drop('registered')
print(hour_df.printSchema())

In [None]:
hour_df = hour_df.select([col(column).cast("double").alias(column)
                             for column in hour_df.columns])
hour_df.show(5)

In [None]:
train_df, test_df =  hour_df.randomSplit([0.8,0.2])
train_df.cache()
test_df.cache()

In [None]:
evaluator = RegressionEvaluator(labelCol = 'cnt',
                               predictionCol ='prediction',
                               metricName = 'rmse')

In [None]:
featuresCols = hour_df.columns[:-1]
print(featureCols)
vectorAssembler = VectorAssembler(inputCols = featuresCols, outputCol='aFeatures')
vecotrIndexer = VectorIndexer(inputCol='aFeatures',outputCol='features',maxCategories=24)
dt = DecisionTreeRegressor(labelCol="cnt",featuresCol="features")
dt_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,dt])

In [None]:
dt_pipeline_Model = dt_pipeline.fit(train_df)
print(dt_pipeline_Model.stages[2].toDebugString[:500])
predicted_df = dt_pipeline_Model.transform(test_df)
predicted_df.drop('aFeatures').drop('features').show(10)
predicted_df = dt_pipeline_Model.transform(test_df)
rmse = evaluator.evaluate(predicted_df)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder,TrainValidationSplit
paramGrid = ParamGridBuilder()\
            .addGrid(dt.maxDepth,[5,10,15,25])\
            .addGrid(dt.maxBins,[25,35,45,50])\
            .build()

In [None]:
tvs = TrainValidationSplit(estimator=dt,evaluator=evaluator,
                          estimatorParamMaps=paramGrid)
tvs_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,tvs])
tvs_pipelineModel = tvs_pipeline.fit(train_df)
predicted_df = tvs_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

In [None]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt,evaluator=evaluator,
                    estimatorParamMaps=paramGrid)
cv_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,cv])
cv_pipelineModel = cv_pipeline.fit(train_df)
predicted_df = cv_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol='cnt',featuresCol='features')
gbt_pipeline = Pipeline(stages=[vectorAssembler, vecotrIndexer,gbt])
gbt_pipelineModel = gbt_pipeline.fit(train_df)
predicted_df = gbt_pipelineModel.transform(test_df)
rmse = evaluator.evaluate(predicted_df)
rmse

In [None]:
predicted_df.drop('aFeatures').drop('features').show(10)