> # 목적
> - 운행거리(trip_distance)에 따른 요금(total_amount)를 예측할 것

# ✅모델 1: 단일 선형회귀

## 1️⃣ 데이터 불러오기

In [11]:
from pyspark.sql import SparkSession

# out of memory 방지를 위해 max memory 설정
MAX_MEMORY="5g"

spark = SparkSession.builder.appName("taxi-fare-prediction")\
                    .config("spark.executor.memory", MAX_MEMORY)\
                                    .config("spark.driver.memory", MAX_MEMORY)\
                                    .getOrCreate()

In [12]:
taxi_filepath = '/home/ubuntu/working/taxi_analysis/data/taxi_parquet/*.parquet'
taxi_df = spark.read.parquet(f'file:///{taxi_filepath}')
taxi_df.createOrReplaceTempView('trips')

## 2️⃣ 데이터 전처리

In [13]:
print(f'taxi_df의 shape : {taxi_df.count()}, {len(taxi_df.columns)}')
print()
taxi_df.printSchema()

taxi_df의 shape : 39656098, 19

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



                                                                                

In [14]:
query = """
        SELECT trip_distance, total_amount
        FROM trips
        WHERE (total_amount BETWEEN 0 AND 5000)
        AND (trip_distance BETWEEN 1 AND  500)
        AND passenger_count < 4
        AND TO_DATE(tpep_pickup_datetime) BETWEEN '2022-01-01' AND '2022-12-31'
"""

final_df = spark.sql(query)
final_df.createOrReplaceTempView('final')

In [15]:
final_df.describe().show()



+-------+-----------------+------------------+
|summary|    trip_distance|      total_amount|
+-------+-----------------+------------------+
|  count|         28834138|          28834138|
|   mean|4.183867343629365|23.890795338908845|
| stddev|4.752827513261283|17.557943295123383|
|    min|              1.0|               0.0|
|    max|            470.8|            2567.8|
+-------+-----------------+------------------+



                                                                                

## 3️⃣ 데이터 쪼개기 (train, test)

In [16]:
train_df, test_df = final_df.randomSplit([0.8,0.2], seed=5)

In [17]:
print(f'train_df의 shape: {train_df.count()}, {len(train_df.columns)}')
print(f'test_df의 shape: {test_df.count()}, {len(test_df.columns)}')

                                                                                

train_df의 shape: 23064996, 2




test_df의 shape: 5769142, 2


                                                                                

- cache
  - 중간 결과를 메모리에 저장해두고, 필요할 때마다 메모리에서 바로 사용할 수 있도록 하는 것.
  - 반복적인 작업 수행할 때 성능 향상시키기 위함.


In [18]:
train_df.cache() 


DataFrame[trip_distance: double, total_amount: double]

## 4️⃣ VectorAssembler
- train_df, test_df 모두 변환하기

In [19]:
train_df.columns

['trip_distance', 'total_amount']

In [21]:
from pyspark.ml.feature import VectorAssembler

vec = VectorAssembler(
    inputCols=['trip_distance'],
    outputCol= 'features'
)

vec_train_df = vec.transform(train_df)
vec_test_df = vec.transform(test_df)

In [23]:
vec_train_df.show()

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|          1.0|         0.0|   [1.0]|
|          1.0|         0.0|   [1.0]|
|          1.0|         0.0|   [1.0]|
|          1.0|         0.3|   [1.0]|
|          1.0|         0.3|   [1.0]|
|          1.0|         0.3|   [1.0]|
|          1.0|        0.31|   [1.0]|
|          1.0|        0.31|   [1.0]|
|          1.0|        0.31|   [1.0]|
|          1.0|         1.3|   [1.0]|
|          1.0|        1.55|   [1.0]|
|          1.0|         3.3|   [1.0]|
|          1.0|         3.3|   [1.0]|
|          1.0|         4.3|   [1.0]|
|          1.0|        5.05|   [1.0]|
|          1.0|         5.3|   [1.0]|
|          1.0|         5.3|   [1.0]|
|          1.0|         5.3|   [1.0]|
|          1.0|         5.8|   [1.0]|
|          1.0|         5.8|   [1.0]|
+-------------+------------+--------+
only showing top 20 rows



## 5️⃣ 모델 생성

In [27]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
        maxIter = 50,
        labelCol = 'total_amount',
        featuresCol = 'features'    

)

In [28]:
# 모델 학습
lr_model = lr.fit(vec_train_df)

23/11/19 15:25:51 WARN Instrumentation: [2b9b8e81] regParam is zero, which might cause numerical instability and overfitting.
23/11/19 15:25:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/19 15:25:51 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/11/19 15:26:06 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

## 6️⃣ 예측

In [29]:
predictions = lr_model.transform(vec_test_df)
predictions.show()

[Stage 19:>                                                         (0 + 1) / 1]

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|          1.0|        0.31|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         5.8|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093106275|
|          1.0|         6.3|   [1.0]|12.97234093

                                                                                

## 평가

In [30]:
print(f'RMS: {lr_model.summary.rootMeanSquaredError :.4f}')
print(f'R2: {lr_model.summary.r2 :.4f}')

RMS: 6.5345
R2: 0.8615


In [31]:
spark.stop()

# ✅모델2. 다중 선형 회귀
- feature들이 많다.

## 1️⃣ 데이터 불러오기

In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY = '5g'

spark = SparkSession.builder.appName('taxi_regression2')\
                            .config('spakr.executor.memory', MAX_MEMORY)\
                            .config('spark.driver.memory', MAX_MEMORY)\
                            .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/11/20 16:33:55 WARN Utils: Your hostname, JeeYeon resolves to a loopback address: 127.0.1.1; using 172.18.95.12 instead (on interface eth0)
23/11/20 16:33:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/20 16:33:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
taxi_filepath = '/home/ubuntu/working/taxi_analysis/data/taxi_parquet/*.parquet'
taxi_df = spark.read.parquet(f"file:///{taxi_filepath}")
taxi_df.createOrReplaceTempView('trips')

In [3]:
taxi_df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [17]:
query = """
        SELECT 
            passenger_count,
            PULocationID as pickup_location_id,
            DOLocationID as dropoff_location_id,
            trip_distance,
            HOUR(tpep_pickup_datetime) as pickup_time,
            DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
            total_amount
        FROM trips
        WHERE (total_amount BETWEEN 0 AND 5000)
        AND (trip_distance BETWEEN 0 AND 500)
        AND passenger_count < 4
        AND tpep_pickup_datetime BETWEEN '2022-01-01' AND '2022-06-30'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

## 2️⃣ 데이터 쪼개기

In [18]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed = 5)

## 3️⃣ 데이터 parquet 저장하기

In [19]:
data_dir = '/home/ubuntu/working/taxi_analysis/data/train_test'

train_df.write.mode('overwrite').format('parquet').save(f'{data_dir}/train/')
test_df.write.mode('overwrite').format('parquet').save(f'{data_dir}/test/')

                                                                                

## 4️⃣ parquet 불러오기

In [20]:
data_dir = '/home/ubuntu/working/taxi_analysis/data/train_test'
train_df = spark.read.parquet(f'file:///{data_dir}/train/*.parquet')
test_df = spark.read.parquet(f'file:///{data_dir}/test/*.parquet')

In [21]:
print(train_df.printSchema())
print()
print(test_df.printSchema())

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)

None

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)

None


## 5️⃣ 파이프라인 만들기

In [23]:
train_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            0.0|                 1|                  1|          0.0|          0|     Friday|       100.8|
|            0.0|                 1|                  1|          0.0|          0|     Friday|       100.8|
|            0.0|                 1|                  1|          0.0|          3|  Wednesday|      110.46|
|            0.0|                 1|                  1|          0.0|          4|     Monday|        0.31|
|            0.0|                 1|                  1|          0.0|          5|     Sunday|        52.8|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [24]:
stages= []

### ① 원핫 인코딩
- pickup_location_id, dropoff_location_id, day_of_week 을 원핫인코딩

In [25]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

cat_feature = ['pickup_location_id','dropoff_location_id','day_of_week']

for c in cat_feature:
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+'_idx').setHandleInvalid('keep')  # setHandleInvalid('keep'): 변환 중 알 수 없는 값을 만나면 유지하라.
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c+'_onehot'])
    stages += [cat_indexer, onehot_encoder]

stages

[StringIndexer_5efd1fa4696b,
 OneHotEncoder_e26aa21ac1dd,
 StringIndexer_0a43dca07884,
 OneHotEncoder_c691eed150a8,
 StringIndexer_d2da190cc9f0,
 OneHotEncoder_e91dbc8dd86f]

### ② Standard Scaler & Vector Assembler

In [26]:
test_df.columns

['passenger_count',
 'pickup_location_id',
 'dropoff_location_id',
 'trip_distance',
 'pickup_time',
 'day_of_week',
 'total_amount']

In [27]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

num_feature = ['passenger_count','trip_distance','pickup_time']

for n in num_feature:
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+'_vector')
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+'_scaled')

    stages += [num_assembler, num_scaler]

stages

[StringIndexer_5efd1fa4696b,
 OneHotEncoder_e26aa21ac1dd,
 StringIndexer_0a43dca07884,
 OneHotEncoder_c691eed150a8,
 StringIndexer_d2da190cc9f0,
 OneHotEncoder_e91dbc8dd86f,
 VectorAssembler_48e91d90470d,
 StandardScaler_f69778e29741,
 VectorAssembler_b9a9a6122adc,
 StandardScaler_3aa4e2db3b82,
 VectorAssembler_cd0252c375df,
 StandardScaler_1923f71f55f4]

### ③ vector로 변환한 특징들을 다시 벡터화 (한 컬럼으로 모으기)

In [28]:
assembler_inputs = [c+'_onehot' for c in cat_feature] + [n+'_scaled' for n in num_feature]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [29]:
total_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol ='features')
stages.append(total_assembler)
stages

[StringIndexer_5efd1fa4696b,
 OneHotEncoder_e26aa21ac1dd,
 StringIndexer_0a43dca07884,
 OneHotEncoder_c691eed150a8,
 StringIndexer_d2da190cc9f0,
 OneHotEncoder_e91dbc8dd86f,
 VectorAssembler_48e91d90470d,
 StandardScaler_f69778e29741,
 VectorAssembler_b9a9a6122adc,
 StandardScaler_3aa4e2db3b82,
 VectorAssembler_cd0252c375df,
 StandardScaler_1923f71f55f4,
 VectorAssembler_091c3e8b3a0a]

### ④ pipeline 생성

In [30]:
from pyspark.ml import Pipeline

# 지금까지는 '전처리' 파이프라인
pipeline = Pipeline(stages = stages)
pipeline

Pipeline_f571961c2251

### ⑤ pipeline에 학습

In [31]:
train_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            0.0|                 1|                  1|          0.0|          0|     Friday|       100.8|
|            0.0|                 1|                  1|          0.0|          0|     Friday|       100.8|
|            0.0|                 1|                  1|          0.0|          3|  Wednesday|      110.46|
|            0.0|                 1|                  1|          0.0|          4|     Monday|        0.31|
|            0.0|                 1|                  1|          0.0|          5|     Sunday|        52.8|
|            0.0|                 1|                  1|          0.0|         14|     Friday|         3.8|
|            0.0|           

In [15]:
fitted_transformer = pipeline.fit(train_df)
fitted_transformer

IllegalArgumentException: requirement failed: The input column pickup_location_id_idx should have at least two distinct values.

In [42]:
train_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            0.0|                 1|                  1|          0.0|          0|     Sunday|       144.1|
|            0.0|                 1|                  1|          0.0|          1|     Monday|       119.8|
|            0.0|                 1|                  1|          0.0|          3|     Monday|        21.3|
|            0.0|                 1|                  1|          0.0|          3|  Wednesday|         0.3|
|            0.0|                 1|                  1|          0.0|         15|   Saturday|      114.35|
|            0.0|                 1|                  1|          0.0|         15|    Tuesday|       110.3|
|            0.0|           

### ⑥ train_df 변환

In [24]:
vec_train_df = fitted_transformer.transform(train_df)
vec_test_df = fitted_transformer.transform(test_df)
vec_train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: long (nullable = true)
 |-- dropoff_location_id: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- features: vector (nullable = true)

In [25]:
vec_train_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|            features|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------

In [27]:
vec_train_df.select('features').show()

+--------------------+
|            features|
+--------------------+
|(534,[78,324,528]...|
|(534,[78,324,530,...|
|(534,[78,324,530,...|
|(534,[78,324,527,...|
|(534,[78,324,526,...|
|(534,[78,324,529,...|
|(534,[78,324,526,...|
|(534,[78,324,529,...|
|(534,[78,324,530,...|
|(534,[78,324,530,...|
|(534,[78,324,525,...|
|(534,[78,324,525,...|
|(534,[78,324,524,...|
|(534,[78,324,530,...|
|(534,[78,324,530,...|
|(534,[58,314,526,...|
|(534,[58,314,524,...|
|(534,[58,314,524,...|
|(534,[58,314,528,...|
|(534,[58,345,528,...|
+--------------------+
only showing top 20 rows



## 6️⃣ 모델 생성

In [28]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver='normal',
    labelCol = 'total_amount',
    featuresCol = 'features'
)

model = lr.fit(vec_train_df)

23/11/20 10:53:47 WARN Instrumentation: [6590050c] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 10:54:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/20 10:54:20 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/11/20 10:54:33 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/11/20 10:54:33 WARN Instrumentation: [6590050c] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 10:54:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/20 10:54:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

## 7️⃣ 예측

In [29]:
predictions = model.transform(vec_test_df)
predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|            features|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

In [30]:
predictions.select('trip_distance','day_of_week','total_amount','prediction').show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.0|     Sunday|        51.8|104.58168466558868|
|          0.0|     Friday|        0.31|105.52036895554467|
|          0.0|   Saturday|      146.74|104.97395984417888|
|          0.0|     Sunday|         3.3|104.29349506583364|
|          1.4|     Sunday|       120.3|108.59964737843553|
|         14.4|   Thursday|         0.3|149.72170232330845|
|         14.4|   Thursday|         0.3|149.68567862333907|
|         17.5|    Tuesday|         0.3|159.59087099322747|
|         17.9|    Tuesday|       67.55| 125.5079047176771|
|         8.17|     Monday|       120.3|118.76274372699892|
|          0.4|   Saturday|         7.3|10.943646127284332|
|          1.6|  Wednesday|        11.8|14.937834835206088|
|          1.6|  Wednesday|       12.35| 15.33485370659359|
|          0.8|   Saturday|       12.95|

## 8️⃣ 평가

In [31]:
print(f'RMSE: {model.summary.rootMeanSquaredError}')
print(f'R2: {model.summary.r2}')

RMSE: 7.438260562770492
R2: 0.8165924586821984


In [32]:
spark.stop()

# ✅ 모델2 + 하이퍼 파라미터 조정

## 1️⃣ train_df, test_df.parquet 가져오기

In [34]:
from pyspark.sql import SparkSession

# out of memory 방지를 위해 max memory 설정
MAX_MEMORY="5g"

spark = SparkSession.builder.appName("taxi-fare-prediction")\
                    .config("spark.executor.memory", MAX_MEMORY)\
                                    .config("spark.driver.memory", MAX_MEMORY)\
                                    .getOrCreate()

In [35]:
# 저장된 train, test dataset 불러오기
data_dir = '/home/ubuntu/working/taxi_analysis/data/train_test'
train_df = spark.read.parquet(f'file:///{data_dir}/train/*.parquet')
test_df = spark.read.parquet(f'file:///{data_dir}/test/*.parquet')

In [36]:
train_df.count()

28848178

## 2️⃣ 파이프라인

In [37]:
train_df.columns

['passenger_count',
 'pickup_location_id',
 'dropoff_location_id',
 'trip_distance',
 'pickup_time',
 'day_of_week',
 'total_amount']

In [38]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
stages = []

# string변환 -> 원핫 인코딩
cat_cols = ['pickup_location_id',
         'dropoff_location_id',
         'day_of_week']

for col in cat_cols:
    string_col = StringIndexer(inputCol=col, outputCol=col+'_str').setHandleInvalid('keep')
    oh_encoder = OneHotEncoder(inputCols=[string_col.getOutputCol()], outputCols=[col+'_oh'])

    stages += [string_col, oh_encoder]

num_cols = ['passenger_count',
         'trip_distance',
         'pickup_time',
           ]

for col in num_cols:
    num_assembler  = VectorAssembler(inputCols= [col], outputCol= col+'_vector')
    num_scaler = StandardScaler(inputCol= num_assembler.getOutputCol(), outputCol=col+'_scaled' )

    stages += [num_assembler, num_scaler]

stages


[StringIndexer_c1a4df188e5f,
 OneHotEncoder_5749c04c6069,
 StringIndexer_e93ecf9b71a4,
 OneHotEncoder_e0e38d6d2a73,
 StringIndexer_0145424826e2,
 OneHotEncoder_ff17d895fcc2,
 VectorAssembler_e6791f431d74,
 StandardScaler_b4873f092bf7,
 VectorAssembler_d48fd817c161,
 StandardScaler_895ffe478e01,
 VectorAssembler_64662387fcfa,
 StandardScaler_c9af2c3e57ba]

In [39]:
assembler_inputs= [col+'_oh' for col in cat_cols] + [num + '_scaled' for num in num_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='feature_vector')

stages += [assembler]
stages

[StringIndexer_c1a4df188e5f,
 OneHotEncoder_5749c04c6069,
 StringIndexer_e93ecf9b71a4,
 OneHotEncoder_e0e38d6d2a73,
 StringIndexer_0145424826e2,
 OneHotEncoder_ff17d895fcc2,
 VectorAssembler_e6791f431d74,
 StandardScaler_b4873f092bf7,
 VectorAssembler_d48fd817c161,
 StandardScaler_895ffe478e01,
 VectorAssembler_64662387fcfa,
 StandardScaler_c9af2c3e57ba,
 VectorAssembler_9856f23a3722]

### 하이퍼 파라미터 튜닝

In [40]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

#### 모델 생성

In [41]:
lr = LinearRegression(
    maxIter= 30,
    solver = 'normal',
    labelCol = 'total_amount',
    featuresCol = 'feature_vector')

cv_stages = stages + [lr]
cv_stages
    

[StringIndexer_c1a4df188e5f,
 OneHotEncoder_5749c04c6069,
 StringIndexer_e93ecf9b71a4,
 OneHotEncoder_e0e38d6d2a73,
 StringIndexer_0145424826e2,
 OneHotEncoder_ff17d895fcc2,
 VectorAssembler_e6791f431d74,
 StandardScaler_b4873f092bf7,
 VectorAssembler_d48fd817c161,
 StandardScaler_895ffe478e01,
 VectorAssembler_64662387fcfa,
 StandardScaler_c9af2c3e57ba,
 VectorAssembler_9856f23a3722,
 LinearRegression_385b2866d12d]

In [42]:
cv_pipeline = Pipeline(stages = cv_stages)

### Gridsearch , Crossvalidation 설정

- GridSearch

In [45]:
param_grid = ParamGridBuilder()\
                .addGrid(lr.elasticNetParam, [0, 0.1, 0.2, 0.3, 0.4, 0.5])\
                .addGrid(lr.regParam, [0, 0.01, 0.02, 0.03, 0.04, 0.05])\
                .build()
# regParam: 정규화 매개변수(값↑, 정규화 강도↑)(값↑, 모델 단순)                
param_grid

[{Param(parent='LinearRegression_385b2866d12d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent='LinearRegression_385b2866d12d', name='regParam', doc='regularization parameter (>= 0).'): 0.0},
 {Param(parent='LinearRegression_385b2866d12d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent='LinearRegression_385b2866d12d', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent='LinearRegression_385b2866d12d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
  Param(parent='LinearRegression_385b2866d12d', name='regParam', doc='regularization parameter (>= 0).'): 0.02},
 {Param(parent

- cross validation
    - **CrossValidator의 estimator에 pipeline을 넣는 경우, pipeline의 마지막 stage가 model이어야 함.**
    - 그래서 기존 pipeline에 lr모델까지 포함한 cv_pipeline을 따로 만든 것.

In [46]:
cross_val = CrossValidator(
        estimator= cv_pipeline, # pipeline 맨 끝에 model이 있으니까
        estimatorParamMaps= param_grid, # gridsearch 할 parameter들의 목록
                                        # 이거 없으면 gridsearch 없이 cross val만 진행
        evaluator=RegressionEvaluator(labelCol='total_amount'),
        numFolds=5
)

cross_val

CrossValidator_67f7226a05c2

## 3️⃣ 훈련
- 전체 데이터로 gridsearch를 하면 너무 오래걸리기 때문에 샘플링을 하겠다.

In [47]:
toy_df = train_df.sample(False, 0.1, seed=1)

cv_model = cross_val.fit(toy_df)

23/11/20 12:02:53 WARN Instrumentation: [4e011c33] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 12:02:58 WARN Instrumentation: [4e011c33] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 12:04:18 WARN Instrumentation: [8972c10f] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 12:04:23 WARN Instrumentation: [8972c10f] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 12:05:44 WARN Instrumentation: [2c8b71a2] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 12:05:48 WARN Instrumentation: [2c8b71a2] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 12:07:09 WARN Instrumentation: [1f85a7d5] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 12:07:13 WARN Instrumentation: [1f85a7d5] Cholesky solv

## 4️⃣ Best Model 찾기

In [48]:
best_alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam()
best_reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam()

In [49]:
best_alpha, best_reg_param

(0.1, 0.01)

## 5️⃣ BEST MODEL로 모델 돌리기

In [50]:
pipeline = Pipeline(stages= stages)
fitted_transformer = pipeline.fit(train_df)

                                                                                

In [51]:
vec_train_df = fitted_transformer.transform(train_df)

In [52]:
vec_test_df = fitted_transformer.transform(test_df)

In [53]:
lr = LinearRegression(
    maxIter = 50,
    solver = 'normal',
    labelCol = 'total_amount',
    featuresCol = 'feature_vector',
    elasticNetParam = best_alpha,
    regParam = best_reg_param
)

In [54]:
model = lr.fit(vec_train_df)

                                                                                

In [55]:
# 예측
predictions = model.transform(vec_test_df)

# 평가
print(f'RMSE: {model.summary.rootMeanSquaredError}')
print(f'R2: {model.summary.r2}')

RMSE: 7.438315674597085
R2: 0.8165897408526807


## 6️⃣ 튜닝된 모델 저장하기

In [56]:
# /home/ubuntu/working/taxi_analysis/data/lr_model
model_dir = '/home/ubuntu/working/taxi_analysis/data/lr_model'
# pipeline_dir = '/home/ubuntu/working/spark-example/data/pipeline'

model.write().overwrite().save(model_dir)


In [None]:
pwd

## 7️⃣ 튜닝 모델 불러오기

In [61]:
# 모델 로딩(할 때는 class도 함께 import해라)
from pyspark.ml.regression import LinearRegressionModel
model_dir = '/home/ubuntu/working/taxi_analysis/data/lr_model'
loaded_model = LinearRegressionModel.load(model_dir)

In [62]:
loaded_model.regParam

Param(parent='LinearRegression_9e2abe05ccfd', name='regParam', doc='regularization parameter (>= 0).')

In [63]:
# 모델의 모든 파라미터 가져오기
params = loaded_model.extractParamMap()

# 파라미터 및 해당 값을 출력
for param, value in params.items():
    print(f"{param.name}: {value}")


aggregationDepth: 2
elasticNetParam: 0.1
epsilon: 1.35
featuresCol: feature_vector
fitIntercept: True
labelCol: total_amount
loss: squaredError
maxBlockSizeInMB: 0.0
maxIter: 50
predictionCol: prediction
regParam: 0.01
solver: normal
standardization: True
tol: 1e-06


## 8️⃣ 로드된 모델을 사용하여 예측 또는 평가 수행

In [64]:
predictions = loaded_model.transform(vec_test_df)    
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")  
rmse = evaluator.evaluate(predictions)  
print("Root Mean Squared Error (RMSE):", rmse)  



Root Mean Squared Error (RMSE): 7.443909114452387


                                                                                

In [65]:
evaluator = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="r2")  
r2 = evaluator.evaluate(predictions)  
print("r2):", r2)  



r2): 0.8161330515277185


                                                                                

In [53]:
spark.stop()