In [7]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("ReadFromHDFS") \
    .getOrCreate()

# Specify the HDFS path to your data
hdfs_path = "hdfs://namenode_host:9870/user/root/input/train.csv"

# Read data into a DataFrame (assuming the data is in CSV format)
df = spark.read.csv(hdfs_path, header=True, inferSchema=True)

# Show the DataFrame content
df.show()

df.printSchema()

                                                                                

+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1| 9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     0|         1|    1|
|2011-01-01 05:00:00|     1|      0|         0|      2| 9.84| 12.88|      75|   6.0032|     0|         1

In [8]:
display(df.take(5))

                                                                                

[Row(datetime='2011-01-01 00:00:00', season=1, holiday=0, workingday=0, weather=1, temp=9.84, atemp=14.395, humidity=81, windspeed=0.0, casual=3, registered=13, count=16),
 Row(datetime='2011-01-01 01:00:00', season=1, holiday=0, workingday=0, weather=1, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=8, registered=32, count=40),
 Row(datetime='2011-01-01 02:00:00', season=1, holiday=0, workingday=0, weather=1, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=5, registered=27, count=32),
 Row(datetime='2011-01-01 03:00:00', season=1, holiday=0, workingday=0, weather=1, temp=9.84, atemp=14.395, humidity=75, windspeed=0.0, casual=3, registered=10, count=13),
 Row(datetime='2011-01-01 04:00:00', season=1, holiday=0, workingday=0, weather=1, temp=9.84, atemp=14.395, humidity=75, windspeed=0.0, casual=0, registered=1, count=1)]

In [9]:
df.createOrReplaceTempView("bike_train")

In [10]:
bs_df = spark.sql("select * from bike_train")
bs_df.show()


                                                                                

+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1| 9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     0|         1|    1|
|2011-01-01 05:00:00|     1|      0|         0|      2| 9.84| 12.88|      75|   6.0032|     0|         1

In [11]:
bs_df.describe().show()

                                                                                

+-------+-------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+
|summary|           datetime|            season|            holiday|        workingday|           weather|              temp|            atemp|          humidity|         windspeed|           casual|        registered|             count|
+-------+-------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+
|  count|              10886|             10886|              10886|             10886|             10886|             10886|            10886|             10886|             10886|            10886|             10886|             10886|
|   mean|               null|2.5066139996325556|

In [12]:
print(bs_df.explain())

== Physical Plan ==
FileScan csv [datetime#158,season#159,holiday#160,workingday#161,weather#162,temp#163,atemp#164,humidity#165,windspeed#166,casual#167,registered#168,count#169] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/opt/spark/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<datetime:string,season:int,holiday:int,workingday:int,weather:int,temp:double,atemp:double...


None


In [13]:
print(bs_df.count())
df_no_null = bs_df.na.drop()
print(df_no_null.count())

                                                                                

10886
10886


                                                                                

In [14]:
bs_df.select('season').distinct().show()
bs_df.select('weather').distinct().show()

                                                                                

+------+
|season|
+------+
|     1|
|     3|
|     4|
|     2|
+------+



[Stage 34:>                                                         (0 + 1) / 1]

+-------+
|weather|
+-------+
|      1|
|      3|
|      4|
|      2|
+-------+



                                                                                

In [15]:
#columns encoding
def valueToCategory(value, encoding_index):
   if(value == encoding_index):
      return 1
   else:
    return 0
     

In [16]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import col
udfValueToCategory = udf(valueToCategory, IntegerType())
bs_df_encoded = (bs_df.withColumn("season_1", udfValueToCategory(col('season'),lit(1)))
                     .withColumn("season_2", udfValueToCategory(col('season'),lit(2)))
                     .withColumn("season_3", udfValueToCategory(col('season'),lit(3)))
                     .withColumn("season_4", udfValueToCategory(col('season'),lit(4))))
bs_df_encoded = bs_df_encoded.drop('season')
     

In [17]:

bs_df_encoded = (bs_df_encoded.withColumn("weather_1", udfValueToCategory(col('weather'),lit(1)))
                     .withColumn("weather_2", udfValueToCategory(col('weather'),lit(2)))
                     .withColumn("weather_3", udfValueToCategory(col('weather'),lit(3)))
                     .withColumn("weather_4", udfValueToCategory(col('weather'),lit(4))))
bs_df_encoded = bs_df_encoded.drop('weather')

In [18]:
bs_df_encoded.show(5, truncate=False)

[Stage 35:>                                                         (0 + 1) / 1]

+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+
|datetime           |holiday|workingday|temp|atemp |humidity|windspeed|casual|registered|count|season_1|season_2|season_3|season_4|weather_1|weather_2|weather_3|weather_4|
+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+
|2011-01-01 00:00:00|0      |0         |9.84|14.395|81      |0.0      |3     |13        |16   |1       |0       |0       |0       |1        |0        |0        |0        |
|2011-01-01 01:00:00|0      |0         |9.02|13.635|80      |0.0      |8     |32        |40   |1       |0       |0       |0       |1        |0        |0        |0        |
|2011-01-01 02:00:00|0      |0         |9.02|13.635|80      |0.0      |5     |27        |32   |1       |0       |0       |0       |1        

                                                                                

In [19]:
from pyspark.sql.functions import split
from pyspark.sql.functions import *
from pyspark.sql.types import *
bs_df_encoded = bs_df_encoded.withColumn('hour',  split(split(bs_df_encoded['datetime'], ' ')[1], ':')[0].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('month', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[0].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('day', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[1].cast('int'))
bs_df_encoded = bs_df_encoded.withColumn('year', split(split(bs_df_encoded['datetime'], ' ')[0], '-')[2].cast('int'))

In [20]:
bs_df_encoded.show(5, truncate=False)

[Stage 36:>                                                         (0 + 1) / 1]

+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+
|datetime           |holiday|workingday|temp|atemp |humidity|windspeed|casual|registered|count|season_1|season_2|season_3|season_4|weather_1|weather_2|weather_3|weather_4|hour|month|day|year|
+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+
|2011-01-01 00:00:00|0      |0         |9.84|14.395|81      |0.0      |3     |13        |16   |1       |0       |0       |0       |1        |0        |0        |0        |0   |2011 |1  |1   |
|2011-01-01 01:00:00|0      |0         |9.02|13.635|80      |0.0      |8     |32        |40   |1       |0       |0       |0       |1        |0        |0        |0        |1   |2011 |1  |1   |
|2011-01-01 02:00:00|0      |0         |

                                                                                

In [21]:
bs_df_encoded = bs_df_encoded.drop('datetime')
bs_df_encoded = bs_df_encoded.withColumnRenamed("count", "label")

In [22]:
bs_df_encoded.printSchema()

root
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- season_1: integer (nullable = true)
 |-- season_2: integer (nullable = true)
 |-- season_3: integer (nullable = true)
 |-- season_4: integer (nullable = true)
 |-- weather_1: integer (nullable = true)
 |-- weather_2: integer (nullable = true)
 |-- weather_3: integer (nullable = true)
 |-- weather_4: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- year: integer (nullable = true)



In [23]:
#ML LIB TRAIN and TEST split is done as follows, 80% for training and 20% for testing
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
train, test = bs_df_encoded.randomSplit([0.8, 0.2], seed=12345)

In [24]:
#defining input variables and output
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["holiday","workingday","temp","atemp","humidity","windspeed","casual","registered","label","season_1","season_2","season_3","season_4","weather_1","weather_2","weather_3","weather_4", "hour", "month", "day", "year"],
    outputCol="features")

output = assembler.transform(train)
#Assembled columns 'hour', 'day' etc  to vector column 'features'
output.show(5)
print(output.count())
#setting up TRAINing data
train_output = output.na.drop()
print(train_output.count())

24/08/09 06:40:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+----------+----+-----+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+--------------------+
|holiday|workingday|temp|atemp|humidity|windspeed|casual|registered|label|season_1|season_2|season_3|season_4|weather_1|weather_2|weather_3|weather_4|hour|month|day|year|            features|
+-------+----------+----+-----+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+--------------------+
|      0|         0|3.28|2.275|      79|  31.0009|     0|        24|   24|       1|       0|       0|       0|        0|        0|        1|        0|   1| 2012|  2|  12|(21,[2,3,4,5,7,8,...|
|      0|         0|3.28| 3.79|      53|  16.9979|     0|        26|   26|       1|       0|       0|       0|        1|        0|        0|        0|   8| 2012|  2|  12|(21,[2,3,4,5,7,8,...|
|      0|         0|3.28|4.545|      53|

In [25]:
output.select('features').show(2,truncate=False)

+---------------------------------------------------------------------------------------------------+
|features                                                                                           |
+---------------------------------------------------------------------------------------------------+
|(21,[2,3,4,5,7,8,9,15,17,18,19,20],[3.28,2.275,79.0,31.0009,24.0,24.0,1.0,1.0,1.0,2012.0,2.0,12.0])|
|(21,[2,3,4,5,7,8,9,13,17,18,19,20],[3.28,3.79,53.0,16.9979,26.0,26.0,1.0,1.0,8.0,2012.0,2.0,12.0]) |
+---------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [26]:
# setting up testing data

test_output = assembler.transform(test)
print(test_output.count())
train_output = test_output.na.drop()
print(test_output.count())

2192
2192


In [27]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)

# Fit the model
lrModel = lr.fit(train_output)

24/08/09 06:41:10 WARN Instrumentation: [9b438f12] regParam is zero, which might cause numerical instability and overfitting.
24/08/09 06:41:10 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/09 06:41:10 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
24/08/09 06:41:11 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/08/09 06:41:11 WARN Instrumentation: [9b438f12] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [28]:
#Coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("\n")
print("Intercept: " + str(lrModel.intercept))

Coefficients: [0.07368046826033069,0.049734509565215995,0.0041106952264321935,-0.004801552301310095,-0.0010787615437215469,-0.005644245537650441,0.5646486605063272,0.5644864723773111,0.43543779135894456,0.19539787908072534,0.16734078392296084,-0.040642909955340714,-0.33123405748358803,-0.11704541232870219,0.08902397276438985,0.14380537145727326,-1.1158662113046425,0.005969780356600364,-0.07566503424084421,0.0503776787076451,0.0015697279247076968]


Intercept: 151.9713713469821


In [29]:
import pyspark.sql.functions
predictions = lrModel.transform(test_output).select("features", "label", "prediction")
predictions.show(10)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
# testRDD = test.rdd 
# predictionAndLabels = testRDD.map(lambda lp: (float(model.predict(lp.features)), lp.label))
# # Evaluate model
# metrics = BinaryClassificationMetrics(predictionAndLabels)
# f1Score = metrics.fMeasure()
# print(f1Score)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="r2")
# print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))


+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(21,[2,3,4,5,7,8,...|   14|13.850845007997435|
|[0.0,0.0,4.92,5.3...|  124|123.82459217736175|
|[0.0,0.0,4.92,5.3...|   91| 90.79935621662776|
|(21,[2,3,4,5,7,8,...|    6| 5.948871769180414|
|(21,[2,3,4,6,7,8,...|   10| 9.911198997576037|
|[0.0,0.0,5.74,6.0...|   15| 14.91484811397288|
|(21,[2,3,4,5,7,8,...|   23| 22.92662106474387|
|[0.0,0.0,6.56,6.0...|   49| 48.77129125145997|
|(21,[2,3,4,5,6,7,...|   44|43.774120844042116|
|[0.0,0.0,6.56,9.0...|   43|42.921706178117645|
+--------------------+-----+------------------+
only showing top 10 rows



In [30]:
# Random Forest Classifier model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=100)

rf_model = rf.fit(train_output)

predictions = rf_model.transform(test_output)

display(predictions.select( "features","prediction", "label").show(5))

# Select (prediction, true label) and compute test error
#evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
#rmse = evaluator.evaluate(predictions)
#print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

                                                                                

+--------------------+------------------+-----+
|            features|        prediction|label|
+--------------------+------------------+-----+
|(21,[2,3,4,5,7,8,...|13.348488497640506|   14|
|[0.0,0.0,4.92,5.3...|119.68936724561414|  124|
|[0.0,0.0,4.92,5.3...| 94.73015986386835|   91|
|(21,[2,3,4,5,7,8,...| 29.36962517840549|    6|
|(21,[2,3,4,6,7,8,...|14.168208513603483|   10|
+--------------------+------------------+-----+
only showing top 5 rows



None

In [31]:
rf_model.save("saved_model") 

                                                                                

In [40]:
loaded_model = RandomForestRegressionModel.load("saved_model")

                                                                                

In [39]:
from pyspark.ml.regression import RandomForestRegressionModel

In [43]:
predictions = loaded_model.transform(test_output)

# Show predictions
predictions.select( "features","prediction", "label").show(5)

+--------------------+------------------+-----+
|            features|        prediction|label|
+--------------------+------------------+-----+
|(21,[2,3,4,5,7,8,...|13.348488497640506|   14|
|[0.0,0.0,4.92,5.3...|119.68936724561414|  124|
|[0.0,0.0,4.92,5.3...| 94.73015986386835|   91|
|(21,[2,3,4,5,7,8,...| 29.36962517840549|    6|
|(21,[2,3,4,6,7,8,...|14.168208513603483|   10|
+--------------------+------------------+-----+
only showing top 5 rows



                                                                                