In [1]:
from pyspark.ml.feature import (
    StringIndexer,        # 범주형 → 수치형 인코딩
    OneHotEncoder,        # 원-핫 인코딩
    VectorAssembler,      # 여러 컬럼 → 하나의 feature 벡터
    StandardScaler,       # 표준 정규화
    MinMaxScaler,         # 최소/최대 스케일링
    Bucketizer,           # 연속형 변수 → 구간화
    QuantileDiscretizer,  # 분위수 기반 구간화
    PCA,                  # 주성분 분석
    PolynomialExpansion,  # 다항 특성 생성
    ChiSqSelector         # 카이제곱 기반 피처 선택
)
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier,
    NaiveBayes,
    MultilayerPerceptronClassifier
)

from pyspark.ml.regression import (
    LinearRegression,
    DecisionTreeRegressor,
    RandomForestRegressor,
    GBTRegressor
)
from pyspark.ml.clustering import (
    KMeans,
    GaussianMixture,
    BisectingKMeans,
    LDA  # Latent Dirichlet Allocation (토픽 모델링)
)
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator,
    RegressionEvaluator,
    ClusteringEvaluator
)
from pyspark.ml import Pipeline  # 전체 파이프라인 구성

from pyspark.ml.tuning import (   # 모델 튜닝
    ParamGridBuilder,
    CrossValidator,
    TrainValidationSplit
)
from pyspark.ml.linalg import Vectors, DenseVector, SparseVector  # 벡터 수동 생성
from pyspark.ml.stat import Correlation, ChiSquareTest            # 통계 테스트

In [2]:
from pyspark.sql import SparkSession
MAX_MEMORY = '8g'
spark = SparkSession.builder.appName('taxi-fare-prediction_2nd')\
    .config('spark.driver.memory',MAX_MEMORY)\
    .config('spark.executer.memory', MAX_MEMORY)\
    .getOrCreate()
spark

In [3]:
import os
cwd =os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv' )
file_path = f"file:///{trip_data_path.replace(os.sep, '/')}"
trip_df = spark.read.csv(file_path, inferSchema=True, header=True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
trip_df.createOrReplaceTempView('trips')

In [5]:
query = '''
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
    '''
data_df = spark.sql(query)
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [6]:
# split 8:2
train_data, test_data = data_df.randomSplit([0.8, 0.2], seed = 1)
train_data.show(3), test_data.show(3) 

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|                 4|                  4|          0.1|         18|   Saturday|         6.3|
|              0|                 4|                 79|          0.7|         12|    Tuesday|         8.8|
|              0|                 4|                 79|          0.7|         23|   Saturday|       12.35|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 3 rows

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+--

(None, None)

# 파이프라인 생성
- 전처리과정을 각 스테이지로 정의해서 쌓는다
- 범주형] onehotencoding : 'pickup_location_id', 'dropoff_location_id',
'day_of_week'
- : 수치형] StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [7]:
stages = []

In [8]:
cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week']
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx')\
    .setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols = [cat_index.getOutputCol()], #_idx col
                                 outputCols=[cat+'_onehot']) #postfix
    stages += [cat_index, onehot_encode] #collist
stages

[StringIndexer_97e8bb53b00c,
 OneHotEncoder_e63caf8d18e9,
 StringIndexer_74e2881a5d1d,
 OneHotEncoder_1bf7c6b9d494,
 StringIndexer_b7581e6a5c12,
 OneHotEncoder_cf016eed1e1f]

In [9]:
num_features = ['passenger_count', 'trip_distance', 'pickup_time']
for num in num_features:
    num_assembler = VectorAssembler(
        inputCols=[num],        
        outputCol= num+'_vector'
    )
    num_scaler = StandardScaler(
        inputCol=num_assembler.getOutputCol(),        
        outputCol= num+'_scaled'
    )
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_97e8bb53b00c,
 OneHotEncoder_e63caf8d18e9,
 StringIndexer_74e2881a5d1d,
 OneHotEncoder_1bf7c6b9d494,
 StringIndexer_b7581e6a5c12,
 OneHotEncoder_cf016eed1e1f,
 VectorAssembler_584549201d24,
 StandardScaler_667ce90b0973,
 VectorAssembler_47a5d8fa46ce,
 StandardScaler_005814383959,
 VectorAssembler_776ea9bd78f1,
 StandardScaler_19497060eef7]

In [10]:
assembler_input = [cat+'_onehot' for cat in cat_features] + [num+'_scaled' for num in num_features]
assembler_input

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

선형회귀모델 - 값들이 수치형
데이터는 피쳐6개 라벨 1개
범주형은 원핫, 수치형은 스케일처리


In [11]:
assembler = VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages += [assembler]
stages

[StringIndexer_97e8bb53b00c,
 OneHotEncoder_e63caf8d18e9,
 StringIndexer_74e2881a5d1d,
 OneHotEncoder_1bf7c6b9d494,
 StringIndexer_b7581e6a5c12,
 OneHotEncoder_cf016eed1e1f,
 VectorAssembler_584549201d24,
 StandardScaler_667ce90b0973,
 VectorAssembler_47a5d8fa46ce,
 StandardScaler_005814383959,
 VectorAssembler_776ea9bd78f1,
 StandardScaler_19497060eef7,
 VectorAssembler_7a6f9eeec182]

In [12]:
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_data)
vtrain_df = fitted_transform.transform(train_data)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [13]:
vtrain_df.select('feature_vector').show(2)

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
+--------------------+
only showing top 2 rows



In [14]:
# linearRegression 생성 maxIter = 50, LabelCol='total_amount', featurescol='features'
lr = LinearRegression(maxIter=50, solver= 'normal', labelCol='total_amount', featuresCol='feature_vector')
# fit
Model = lr.fit(vtrain_df)

In [15]:
# 테스트 데이터도 변환
vtest_df = fitted_transform.transform(test_data)
# 테스트 데이터로 예측
pred = Model.transform(vtest_df)

In [16]:
pred.select('total_amount', 'prediction').show(3)

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|       10.55|12.695522792729275|
|        13.3|14.450558014776915|
|        21.3|21.108271361254218|
+------------+------------------+
only showing top 3 rows



In [17]:
Model.summary.r2 ,Model.summary.rootMeanSquaredError

(0.80849012500813, 5.6485201652667625)

In [19]:
spark.stop()