In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, MinMaxScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import time
import datetime

spark = SparkSession.builder.appName("MySparkSession") \
                            .config("spark.executor.memory", "36g") \
                            .config("spark.memory.offHeap.enabled", "true") \
                            .config("spark.memory.offHeap.size", "3g") \
                            .getOrCreate()


df = spark.read.parquet('../time_series/TS6/TS6-2022.parquet')
df.show(10)
df.count()

+----+----+-----+---+----+------------+-------+----------+-----+-----------+------------+-------------------+------------------+-----------------+
|Name|year|month|day|hour|PULocationID|weekday|is_holiday|count|        lat|         lon|           TempTime|            countN|__index_level_0__|
+----+----+-----+---+----+------------+-------+----------+-----+-----------+------------+-------------------+------------------+-----------------+
|lyft|2022|    1|  1|   0|           3|      5|      true|   16|40.86429404|-73.84650986|2022-01-01 00:00:00|16.666666666666668|         20013480|
|lyft|2022|    1|  1|   1|           3|      5|      true|   21|40.86429404|-73.84650986|2022-01-01 01:00:00|              14.0|         20013481|
|lyft|2022|    1|  1|   2|           3|      5|      true|    6|40.86429404|-73.84650986|2022-01-01 02:00:00| 6.666666666666667|         20013482|
|lyft|2022|    1|  1|   3|           3|      5|      true|   11|40.86429404|-73.84650986|2022-01-01 03:00:00| 6.333333

6859080

In [2]:
df = df.drop('is_holiday', 'TempTime', '__index_level_0__')
df.show(10)

+----+----+-----+---+----+------------+-------+-----+-----------+------------+------------------+
|Name|year|month|day|hour|PULocationID|weekday|count|        lat|         lon|            countN|
+----+----+-----+---+----+------------+-------+-----+-----------+------------+------------------+
|lyft|2022|    1|  1|   0|           3|      5|   16|40.86429404|-73.84650986|16.666666666666668|
|lyft|2022|    1|  1|   1|           3|      5|   21|40.86429404|-73.84650986|              14.0|
|lyft|2022|    1|  1|   2|           3|      5|    6|40.86429404|-73.84650986| 6.666666666666667|
|lyft|2022|    1|  1|   3|           3|      5|   11|40.86429404|-73.84650986| 6.333333333333333|
|lyft|2022|    1|  1|   4|           3|      5|   10|40.86429404|-73.84650986| 6.333333333333333|
|lyft|2022|    1|  1|   5|           3|      5|    7|40.86429404|-73.84650986| 5.333333333333333|
|lyft|2022|    1|  1|   6|           3|      5|    9|40.86429404|-73.84650986|               6.0|
|lyft|2022|    1|  1

In [3]:
# df = df.withColumn("is_holiday", col("is_holiday").cast("int"))
# df.show(10)

In [4]:
from pyspark.sql.types import IntegerType
mapping = {'yellow': 1, 
           'lyft': 2, 
           'uber': 3}

for key, value in mapping.items():
    df = df.withColumn("Name", when(df["Name"] == key, value).otherwise(df["Name"]))


for c in ['Name', 'year', 'month', 'day', 'hour', 'PULocationID']:
    df = df.withColumn(c, col(c).cast('integer'))

df.printSchema()

root
 |-- Name: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- countN: double (nullable = true)



In [5]:
print(f'Num of rows: {df.count()}')
print(f'Num of columns: {len(df.columns)}')

Num of rows: 6859080
Num of columns: 11


讀入df1並進行與df相同的處理

In [6]:
df1 = spark.read.parquet('../time_series/TS6/TS6-2023.parquet')
df1 = df1.drop('is_holiday', 'TempTime', '__index_level_0__')

# df1 = df1.withColumn("is_holiday", col("is_holiday").cast("integer"))

mapping = {'yellow': 1, 
           'lyft': 2, 
           'uber': 3}

for key, value in mapping.items():
    df1 = df1.withColumn("Name", when(df1["Name"] == key, value).otherwise(df1["Name"]))


for c in ['Name', 'year', 'month', 'day', 'hour', 'PULocationID']:
    df1 = df1.withColumn(c, col(c).cast('integer'))

df1.printSchema()

root
 |-- Name: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- countN: double (nullable = true)



組合df的features

In [7]:
feature_variables = df.drop('count', 'countN')
inputcols = feature_variables.columns

assembler = VectorAssembler(inputCols=inputcols, outputCol="features")

output = assembler.transform(df)
output.show(10)

+----+----+-----+---+----+------------+-------+-----+-----------+------------+------------------+--------------------+
|Name|year|month|day|hour|PULocationID|weekday|count|        lat|         lon|            countN|            features|
+----+----+-----+---+----+------------+-------+-----+-----------+------------+------------------+--------------------+
|   2|2022|    1|  1|   0|           3|      5|   16|40.86429404|-73.84650986|16.666666666666668|[2.0,2022.0,1.0,1...|
|   2|2022|    1|  1|   1|           3|      5|   21|40.86429404|-73.84650986|              14.0|[2.0,2022.0,1.0,1...|
|   2|2022|    1|  1|   2|           3|      5|    6|40.86429404|-73.84650986| 6.666666666666667|[2.0,2022.0,1.0,1...|
|   2|2022|    1|  1|   3|           3|      5|   11|40.86429404|-73.84650986| 6.333333333333333|[2.0,2022.0,1.0,1...|
|   2|2022|    1|  1|   4|           3|      5|   10|40.86429404|-73.84650986| 6.333333333333333|[2.0,2022.0,1.0,1...|
|   2|2022|    1|  1|   5|           3|      5| 

In [8]:
scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)

scaled_data = scaler.fit(output).transform(output)
scaled_data.select('scaled_features', 'countN').show()

+--------------------+------------------+
|     scaled_features|            countN|
+--------------------+------------------+
|[2.44948956422488...|16.666666666666668|
|[2.44948956422488...|              14.0|
|[2.44948956422488...| 6.666666666666667|
|[2.44948956422488...| 6.333333333333333|
|[2.44948956422488...| 6.333333333333333|
|[2.44948956422488...| 5.333333333333333|
|[2.44948956422488...|               6.0|
|[2.44948956422488...| 4.666666666666667|
|[2.44948956422488...| 7.666666666666667|
|[2.44948956422488...|               9.0|
|[2.44948956422488...| 6.333333333333333|
|[2.44948956422488...| 5.666666666666667|
|[2.44948956422488...|10.333333333333334|
|[2.44948956422488...|10.333333333333334|
|[2.44948956422488...|11.666666666666666|
|[2.44948956422488...|14.666666666666666|
|[2.44948956422488...|12.666666666666666|
|[2.44948956422488...|              15.0|
|[2.44948956422488...|14.666666666666666|
|[2.44948956422488...|              10.0|
+--------------------+------------

In [9]:
final_data = scaled_data.select('scaled_features', 'countN')

train, test = final_data.randomSplit([0.7, 0.3])

train.describe().show()
test.describe().show()

+-------+------------------+
|summary|            countN|
+-------+------------------+
|  count|           4801765|
|   mean| 36.57626483872736|
| stddev| 60.77862073991568|
|    min|               0.0|
|    max|1470.3333333333333|
+-------+------------------+

+-------+------------------+
|summary|            countN|
+-------+------------------+
|  count|           2057315|
|   mean|36.584410586937835|
| stddev|60.854773873657706|
|    min|               0.0|
|    max|1476.3333333333333|
+-------+------------------+



In [10]:
print(f'Train: {train.count()}')
print(f'Test : {test.count()}')

Train: 4801765
Test : 2057315


In [11]:
print("start時間:", datetime.datetime.now())
start_time = time.time()

gbtr = GBTRegressor(featuresCol='scaled_features', labelCol='countN', 
                    maxDepth=10, maxIter=100, stepSize=0.1, 
                    cacheNodeIds=True, subsamplingRate=0.8, seed=42, maxMemoryInMB=10240)

gbtr_model = gbtr.fit(train)

y_pred = gbtr_model.transform(test)

y_pred.select('countN', 'prediction').show(10)


end_time = time.time()
execution_time = end_time - start_time
print("執行時間:", execution_time, "秒")

+------------------+--------------------+
|            countN|          prediction|
+------------------+--------------------+
|               0.0|    1.53787650236351|
|               0.0|  0.6970397935382844|
|               0.0|-0.03441223401360405|
|               0.0|  0.5868512963445633|
|               0.0|   0.531348351258214|
|               0.0|  1.2946898460784817|
|2.6666666666666665|  3.4618174252622618|
|               0.0|-0.32433369090728265|
|               0.0| 0.22021831372952572|
|               0.0|    -5.4551571267528|
+------------------+--------------------+
only showing top 10 rows

執行時間: 9388.456265926361 秒


In [12]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='countN')

r2 = evaluator.evaluate(y_pred, {evaluator.metricName: 'r2'})
mae = evaluator.evaluate(y_pred, {evaluator.metricName: 'mae'})
rmse = evaluator.evaluate(y_pred, {evaluator.metricName: 'rmse'})

print(f'R2: {r2}')
print(f'MAE: {mae}')
print(f'RMSE: {rmse}')

R2: 0.9723282548874154
MAE: 5.3419144244341865
RMSE: 10.123083562062549


# 驗證集

In [13]:
feature_variables = df1.drop('count', 'countN')
inputcols = feature_variables.columns

assembler = VectorAssembler(inputCols=inputcols, outputCol="features")

output = assembler.transform(df1)


scaler = StandardScaler(inputCol="features", outputCol="scaled_features",
                        withStd=True, withMean=False)

scaled_data = scaler.fit(output).transform(output)

valid_data = scaled_data.select('scaled_features', 'count')



y_pred_2023 = gbtr_model.transform(valid_data)
y_pred_2023.show(5)

+--------------------+-----+------------------+
|     scaled_features|count|        prediction|
+--------------------+-----+------------------+
|[2.44948938270710...|   12|13.889402692274995|
|[2.44948938270710...|   28|12.044515132291076|
|[2.44948938270710...|   19| 5.626331337055775|
|[2.44948938270710...|   26| 4.701026197754847|
|[2.44948938270710...|   17| 5.441334966489215|
+--------------------+-----+------------------+
only showing top 5 rows



In [14]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='count')

r2 = evaluator.evaluate(y_pred_2023, {evaluator.metricName: 'r2'})
mae = evaluator.evaluate(y_pred_2023, {evaluator.metricName: 'mae'})
rmse = evaluator.evaluate(y_pred_2023, {evaluator.metricName: 'rmse'})

print(f'R2: {r2}')
print(f'MAE: {mae}')
print(f'RMSE: {rmse}')

R2: 0.9240277583483465
MAE: 8.579792656025745
RMSE: 18.357343198180875


In [15]:
spark.stop()