# 거리를 나타내는 컬럼만 사용하여 선형 회귀 모델을 만들어본다.
## 인스턴스 생성 시 부터 맥스 메모리(Max Memory)를 지정해준다.
## Out of Memory 증상을 미연에 방지한다.

In [1]:
from pyspark.sql import SparkSession

# 인스턴스 생성(Max Memory 지정: Out of Memory 방지)
MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton").config("spark.executor.memory", MAX_MEMORY).config("spark.driver.memory", MAX_MEMORY).getOrCreate()

24/12/18 17:00:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# 파일 불러오기
trips_df = spark.read.csv('/home/lab01/src/data/trip/*', inferSchema=True, header=True)
trips_df.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



                                                                                

In [3]:
trips_df.createOrReplaceTempView("trips")

In [4]:
# SQL_analysis.ipynb에서 진행했던 이상치 제거를 적용한다.
query = """
SELECT 
    trip_distance,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")
data_df.show()

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|         16.5|       70.07|
|         1.13|       11.16|
|         2.68|       18.59|
|         12.4|        43.8|
|          9.7|        32.3|
|          9.3|       43.67|
|         9.58|        46.1|
|         16.2|        45.3|
|         3.58|        19.3|
|         0.91|        14.8|
|         2.57|        12.8|
|          0.4|         5.3|
|         3.26|        17.3|
|        13.41|       47.25|
|         18.3|       61.42|
|         1.53|       14.16|
|          2.0|        11.8|
|         16.6|       54.96|
|         15.5|       56.25|
|          1.3|        16.8|
+-------------+------------+
only showing top 20 rows



In [6]:
data_df.describe().show()



+-------+------------------+------------------+
|summary|     trip_distance|      total_amount|
+-------+------------------+------------------+
|  count|          13326131|          13326131|
|   mean|2.8874228408830245|17.990713821025437|
| stddev|3.8336178303381048| 13.01169363051562|
|    min|              0.01|              0.01|
|    max|             475.5|            4973.3|
+-------+------------------+------------------+



                                                                                

In [7]:
# train / test split
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=42)
print(train_df.count())
print(test_df.count())

                                                                                

10660955




2665176


                                                                                

# train : 10659921 / test : 2666210

# VectorAssembler
### - VectorAssembler는 trip_distance 컬럼을 특징 벡터로 변환합니다. 이를 통해 trip_distance 값을 features라는 새로운 컬럼에 벡터 형태로 저장하게 됩니다.
### - features 컬럼은 모델에 입력될 특성 벡터를 나타내며, 이 값은 total_amount(목표 변수)를 예측하는 데 사용됩니다.

In [8]:
from pyspark.ml.feature import VectorAssembler
vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
vtrain_df = vassembler.transform(train_df)
vtrain_df.show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|        3.05|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 20 rows



                                                                                

# LinearRegression
### - 선형 회귀 모델을 사용하여 trip_distance(이동 거리)를 입력으로 total_amount(총 금액)를 예측하는 모델을 생성합니다.
### - prediction 컬럼은 trip_distance에 대한 예측된 total_amount 값을 보여줍니다. 예를 들어, trip_distance가 0.01일 때 모델이 예측한 total_amount는 9.41입니다.

In [9]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    maxIter=50, # 모델 학습할 때 최대 50번의 반복을 허용한다.
    labelCol="total_amount",
    featuresCol="features"
)
# 선형 회귀 모델 학습
model = lr.fit(vtrain_df)
vtest_df = vassembler.transform(test_df)

# 예측
prediction = model.transform(vtest_df)
prediction.show()

24/12/18 17:03:37 WARN Instrumentation: [717df371] regParam is zero, which might cause numerical instability and overfitting.
24/12/18 17:03:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/12/18 17:03:45 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/12/18 17:04:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/12/18 17:04:33 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
[Stage 16:>                                                         (0 + 1) / 1]

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874707077|
|         0.01|         3.3|  [0.01]|9.418361874

                                                                                

In [10]:
# 평가
# RMSE : 오차를 의미하고 값이 작을 수록 예측 성능이 좋다
model.summary.rootMeanSquaredError

6.308435272654453

## 오차가 약 6.307임을 나타낸다.

In [11]:
# R-squared(결정계수) : 0과 1 사이의 값을 가지며 1에 가까울수록 모델이 데이터를 잘 설명한다는 의미를 가진다.
model.summary.r2

0.765849929974943

## 이 모델의 예측 성능은   약 76.57%이다. 그렇게 높게 측정되진 않다.

# 실 서비스 task
실 서비스에서 사용될 모델을 입력 데이터 벡터화 및 예측 결과 출력을 통해 테스트하고 준비하는 과정입니다. trip_distance를 기반으로 total_amount를 예측하는 모델을 실시간으로 적용할 수 있도록 준비하는 작업으로, 실제 서비스에서 유용하게 활용될 수 있습니다.

In [12]:
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 30.0]
distance_df = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")
distance_df.show()

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|         30.0|
+-------------+



In [13]:
vdistance_df = vassembler.transform(distance_df)
model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]|12.666016967333638|
|          5.5|   [5.5]|25.775817341239016|
|         10.5|  [10.5]| 40.67331776613149|
|         30.0|  [30.0]| 98.77356942321214|
+-------------+--------+------------------+



In [14]:
# 예측에 쓰일 컬럼을 추가하고 전처리하여 성능을 올린다.
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [15]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=6)

In [16]:
# 나뉘어진 데이터 프레임을 저장해둔다.
data_dir = '/home/lab01/src/project/taxi/'

In [18]:
# 파퀫 형태로 저장
# 압축률이 좋고 디스크 io가 적다
# 컬럼 기반 포맷이다.
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")


# 파퀫 형태로 불러오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

AnalysisException: path file:/home/lab01/src/project/taxi/train already exists.

# 스테이지 파이프 라인
### 프리프로세싱을 위한 파이프 라인을 생성하여 전처리한다.
- 데이터 전처리를 위한 여러 단계를 파이프라인으로 연결하여 훈련 데이터(train_df)에 적용하는 과정입니다. 각 단계는 모델 학습을 위한 데이터 준비를 돕고, 범주형 변수와 수치형 변수를 각각 적절히 변환하여 예측 모델에 사용할 수 있는 형식으로 만듭니다.
- 범주형 데이터: pickup_location_id, dropoff_location_id, day_of_week와 같은 범주형 데이터를 숫자형 데이터로 변환하고, 이를 **원핫 인코딩(One-Hot Encoding)**을 통해 모델에 맞는 형식으로 변환합니다.
- 수치형 데이터: passenger_count, trip_distance, pickup_time과 같은 수치형 데이터를 **스케일링(Scaling)**하여 모델 학습에 적합하게 만듭니다.
- 이 모든 전처리 과정을 하나의 **파이프라인(Pipeline)**으로 연결하여 훈련 데이터에 일괄 적용합니다.

In [19]:
# 스트링 값을 숫자값으로 바꾸어 원핫인코딩을 진행한다.
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# category feature 설정
cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

for c in cat_feats:
    # StringIndexer는 범주형 변수(pickup_location_id, dropoff_location_id, day_of_week)를 숫자형 값으로 변환합니다.
    # 예를 들어, pickup_location_id가 "A", "B", "C"와 같은 카테고리 값을 갖는 경우, StringIndexer는 이를 0, 1, 2와 같은 숫자로 변환합니다.
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    # OneHotEncoder는 StringIndexer로 숫자형으로 변환된 범주형 데이터를 원핫 인코딩 방식으로 변환합니다.
    # 예를 들어, pickup_location_id가 0, 1, 2로 변환되었다면, OneHotEncoder는 이를 [1, 0, 0], [0, 1, 0], [0, 0, 1]로 변환합니다.
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]
    
stages

[StringIndexer_688d07d16fc3,
 OneHotEncoder_47763915ec36,
 StringIndexer_4d164e8b3874,
 OneHotEncoder_48d173cf8562,
 StringIndexer_daaae5cad3c8,
 OneHotEncoder_472871429db9]

In [20]:
# 수치형 전처리: VectorAssembler, StandardScaler
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    # VectorAssembler는 단일 수치형 컬럼을 벡터 형식으로 변환
    # trip_distance 값이 10.0이라면 이 값은 [10.0]과 같은 벡터로 변환됩니다.
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vecotr")
    # StandardScaler는 수치형 데이터를 스케일링하여, 평균이 0이고 표준편차가 1인 값으로 변환
    # trip_distance 값이 매우 클 경우, 이를 스케일링하지 않으면 모델이 다른 특성에 비해 지나치게 영향을 받을 수 있다
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_688d07d16fc3,
 OneHotEncoder_47763915ec36,
 StringIndexer_4d164e8b3874,
 OneHotEncoder_48d173cf8562,
 StringIndexer_daaae5cad3c8,
 OneHotEncoder_472871429db9,
 VectorAssembler_7973979a1d5f,
 StandardScaler_38efdaadfd72,
 VectorAssembler_1c2eef4b74a5,
 StandardScaler_e6408ff8628f,
 VectorAssembler_a7c09345fc04,
 StandardScaler_f4a06ee1e2f7]

In [21]:
# 두 가지를 프리프로세싱을 했는데 합치는 과정을 진행한다.
# 데이터의 원핫 인코딩된 값과 스케일링된 수치형 데이터를 하나의 특성 벡터(feature_vector)로 결합하는 과정
# assembler_inputs에는 원핫 인코딩된 범주형 특성과 스케일링된 수치형 특성들이 포함
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
# VectorAssembler는 이 모든 데이터를 하나의 특성 벡터(feature_vector)로 결합
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

stages

[StringIndexer_688d07d16fc3,
 OneHotEncoder_47763915ec36,
 StringIndexer_4d164e8b3874,
 OneHotEncoder_48d173cf8562,
 StringIndexer_daaae5cad3c8,
 OneHotEncoder_472871429db9,
 VectorAssembler_7973979a1d5f,
 StandardScaler_38efdaadfd72,
 VectorAssembler_1c2eef4b74a5,
 StandardScaler_e6408ff8628f,
 VectorAssembler_a7c09345fc04,
 StandardScaler_f4a06ee1e2f7,
 VectorAssembler_3faab45ec0f0]

### - Pipeline: 위에서 정의한 전처리 단계를 파이프라인으로 묶습니다. 파이프라인을 사용하면 여러 단계의 변환을 일괄적으로 처리할 수 있습니다.
### - pipeline.fit(train_df)는 훈련 데이터(train_df)에 대해 전처리 과정을 학습하는 부분입니다. 이 과정에서 각 변환 단계들이 실제 데이터에 맞춰 적합(fit)됩니다.
### - fitted_transformer.transform(train_df)는 학습된 전처리 파이프라인을 train_df에 적용하여 변환된 데이터(vtrain_df)를 생성합니다.

In [22]:
# 스테이지로 파이프 라인 생성
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)


# 적용
vtrain_df = fitted_transformer.transform(train_df)

                                                                                

In [23]:
# # 스트링 값을 숫자값으로 바꾸어 원핫인코딩을 진행한다.
# from pyspark.ml.feature import OneHotEncoder, StringIndexer

# # 카테고리 피쳐 설정
# cat_feats = [
#     "pickup_location_id",
#     "dropoff_location_id",
#     "day_of_week"
# ]

# # 스테이지를 담는 배열
# stages = []

# # 범주형 변수에 대해 StringIndexer와 OneHotEncoder 적용
# for c in cat_feats:
#     cat_indexer = StringIndexer(inputCol=c, outputCol=c + "_idx").setHandleInvalid("keep")
#     onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
#     stages += [cat_indexer, onehot_encoder]

# # 수치형 데이터에 대해 VectorAssembler와 StandardScaler 적용
# from pyspark.ml.feature import VectorAssembler, StandardScaler

# num_feats = [
#     "passenger_count",
#     "trip_distance",
#     "pickup_time"
# ]

# # 수치형 변수를 벡터화하고 스케일링
# for n in num_feats:
#     num_assembler = VectorAssembler(inputCols=[n], outputCol=n + "_vector")  # 변경된 출력 컬럼 이름
#     num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n + "_scaled")
#     stages += [num_assembler, num_scaler]

# # 범주형 특성과 수치형 특성을 결합하는 과정에서 출력 컬럼 이름을 변경
# assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]

# # 결합된 특성 벡터를 저장할 새로운 출력 컬럼 이름 지정
# assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="final_feature_vector")  # 변경된 출력 컬럼 이름
# stages += [assembler]

# # 파이프라인 생성
# from pyspark.ml import Pipeline

# pipeline = Pipeline(stages=stages)

# # 파이프라인 학습
# fitted_transformer = pipeline.fit(train_df)

# # 적용
# vtrain_df = fitted_transformer.transform(train_df)

# # 변환된 데이터프레임 출력
# vtrain_df.show()

[Stage 33:>                                                        (0 + 2) / 11]

KeyboardInterrupt: 

# 모델링

In [24]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [25]:
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vecotr: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vecotr: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vecotr: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [26]:
model = lr.fit(vtrain_df)

24/12/18 17:13:19 WARN Instrumentation: [ce22825d] regParam is zero, which might cause numerical instability and overfitting.
24/12/18 17:17:06 WARN Instrumentation: [ce22825d] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

In [27]:
vtest_df = fitted_transformer.transform(test_df)
predictions = model.transform(vtest_df)
# 캐싱을하여 나중에 쓰기 쉽게 만든다.
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vecotr: vector, passenger_count_scaled: vector, trip_distance_vecotr: vector, trip_distance_scaled: vector, pickup_time_vecotr: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [28]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

[Stage 39:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.9|     Monday|         9.8|12.865737194280896|
|          1.0|    Tuesday|       10.55|12.746845770078778|
|          1.5|     Friday|        12.3| 16.22003685330334|
|          5.1|   Thursday|        26.8|21.586481392813468|
|          8.4|     Sunday|        32.8|28.625924538793235|
|          2.2|    Tuesday|        13.3|13.316406587195926|
|          0.7|  Wednesday|         5.8| 9.588440263875905|
|          1.7|     Friday|         8.8|12.976060771403526|
|         16.8|     Friday|       82.37| 71.78059156694991|
|         14.2|  Wednesday|       85.65|  89.8114798292113|
|          0.4|     Friday|         7.3|13.368775176364755|
|          0.7|  Wednesday|        10.3|14.485899290783504|
|          9.0|     Friday|        38.3|34.747976218143855|
|          4.7|   Thursday|        21.3|

                                                                                

In [29]:
model.summary.rootMeanSquaredError

5.8801773141961995

In [30]:
model.summary.r2

0.7966313304411026

### 스테이지 파이프라인 전처리를 활용해서 전보다 성능이 약 3% 올랐다
아직 구현할게 많아서 다른 모델 학습이나, 하이퍼 파라미터 설정은 추후에 다시 진행할 예정이다.

 # 모델 저장 및 로드

In [35]:
# 모델 훈련
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

# 훈련된 모델 학습
model = lr.fit(vtrain_df)

# 모델 저장
model_dir = "/home/lab01/src/project/taxi/model/"

# 이미 존재하는 모델이 있으면 덮어쓰고 저장
model.write().overwrite().save(model_dir)

# 모델 불러오기
from pyspark.ml.regression import LinearRegressionModel
lr_model = LinearRegressionModel.load(model_dir)  # 저장된 모델 불러오기

# 테스트 데이터에 대해 예측
vtest_df = fitted_transformer.transform(test_df)
predictions = lr_model.transform(vtest_df)

# 예측 결과 출력
predictions.show()

24/12/18 17:29:24 WARN Instrumentation: [94eb1f4a] regParam is zero, which might cause numerical instability and overfitting.
24/12/18 17:32:30 WARN Instrumentation: [94eb1f4a] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
[Stage 52:>                                                         (0 + 1) / 1]

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vecotr|passenger_count_scaled|trip_distance_vecotr|trip_distance_scaled|pickup_time_vecotr|  pickup_time_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

                                                                                

In [36]:
# 모델 평가 (RMSE, R²)
print("RMSE:", model.summary.rootMeanSquaredError)
print("R2:", model.summary.r2)

RMSE: 5.8801773141961995
R2: 0.7966313304411026


In [37]:
spark.stop()