In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('taxi-fare-prediction').getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/20 13:03:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
directory = "/home/ubuntu/working/datasource"
trip_files = "/trips/*"

trips_df = spark.read.csv(f"file:///{directory}/{trip_files}", inferSchema=True, header=True)
trips_df.printSchema()

# trip_distance: double (nullable = true) => x
# tolls_amount: double (nullable = true) => y



root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)




                                                                                

In [3]:
trips_df.createOrReplaceTempView("trips")

In [4]:
# 데이터 정제
query = """
SELECT
    t.passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    t.trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') as day_of_week,
    
    t.total_amount

FROM trips t

WHERE t.total_amount < 200
  AND t.total_amount > 0
  AND t.passenger_count < 5
  AND TO_DATE(t.tpep_pickup_datetime) >= '2021-01-01'
  AND TO_DATE(t.tpep_pickup_datetime) < '2021-08-01'
  AND t.trip_distance < 10
  AND t.trip_distance > 0
"""

data_df = spark.sql(query)

In [5]:
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



Train / Test 나누기

In [6]:
train_sdf, test_sdf = data_df.randomSplit([0.8,0.2], seed=42)

만약에 데이터의 양이 너무 많고, 그 데이터를 오랜 시간을 들여서 전처리를 다 완료 했다고 가정.
- 여러 모델을 만들거나 실험을 할 때에도 위의 전처리 작업을 그대로 매번 수행
- 추후에 다시 이 데이터를 활용한다면 시간이 많이 걸릴듯....
- 처리가 완료된 데이터를 파일이나 데이터베이스에 저장해 놓고 나중에 불러오는게 더 빠르다.

In [8]:
# 파케이 (parquet) 형식으로 저장
save_dir = "/home/ubuntu/working/spark-examples/data/ml-data"

# Spark DataFrame의 write 메소드를 이용해 데이터를 파일 또는 데이터베이스에 저장할 수 있다.
train_sdf.write.format("parquet").save(f"{save_dir}/train/")
test_sdf.write.format("parquet").save(f"{save_dir}/test/")

                                                                                

카테고리 -> 원핫인코딩
실수 -> standardscaling
원핫 + standard => Assemble해줌 -> feature
이 과정이 pipeline임.

# 파이프라인 정의
파이프라인 정의를 위한 stage 정의

In [9]:
# 파이프라인 정의에 넣을 과정(stage)을 모아 놓을 리스트
stages = []

## OneHotEncoding Stage
- `pickup_location_id`
- `dropoff_location_id`
- `day_of_week`

`pickup_location_id`, `dropoff_location_id`는 숫자 형식의 데이터
- 숫자 형식의 데이터는 `OneHotEncoding`이 불가능
- `StringIndexer` Transformer를 활용해 숫자형 데이터를 문자열로 취급하게끔 할 수 있다.

In [12]:
# pick, drop -> int를 문자열로 변환해서 카테고리 형식이 되도록 
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# OneHotEncoding을 수행할 컬럼
cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week" # day는 이미 str이여서 굳이 안해줘도 되지만 걍 같이 해주겠음
]

for c in cat_features:
    # 1. 데이터를 문자열 형식으로 바꿔준다.
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep") # 위에서 이미 null값 체크 및 제거, 중복값 처리, EDA를 충분히 하고 진행하는 것임
    
    # 2. OneHotEncoding 수행
    onehot_encoder = OneHotEncoder(
        inputCols = [cat_indexer.getOutputCol()],
        outputCols = [c+"_onehot"] # 얘만 assembling 해주기
    )
    
    stages += [cat_indexer, onehot_encoder]
    
stages

[StringIndexer_5e82c1de92a4,
 OneHotEncoder_79630feee761,
 StringIndexer_373a484bce43,
 OneHotEncoder_ef46306b6083,
 StringIndexer_8a31a14b3d37,
 OneHotEncoder_4ad983ec6e38]

## Standard Scaling Stage
- 숫자형 데이터들에 대한 표준화 수행
- `passenger_count`, `trip_distance`, `pickup_time`

In [13]:
# 각 컬럼의 데이터를 벡터화 시키고, Standard Scaling을 수행
## 반드시 이차원 배열로 되어 있어야함.
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_features:
    # 1. 벡터화 Stage
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    
    # 2. StandardScaler Stage
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled") # 얘만 assembling 해주기
    
    stages += [num_assembler, num_scaler]
    
stages

[StringIndexer_5e82c1de92a4,
 OneHotEncoder_79630feee761,
 StringIndexer_373a484bce43,
 OneHotEncoder_ef46306b6083,
 StringIndexer_8a31a14b3d37,
 OneHotEncoder_4ad983ec6e38,
 VectorAssembler_bc1f54be5a52,
 StandardScaler_a87910c3d334,
 VectorAssembler_860825e843da,
 StandardScaler_a71d1ca121b5,
 VectorAssembler_73b19994c644,
 StandardScaler_ab9ca325122d]

## Feature Assemble Stage
- 컬럼 명 뒤에 `_onehot`이 붙거나 `_scaled`가 붙은 컬럼만 Feature Vector로 만들기

In [15]:
assembler_inputs = [ c + "_onehot" for c in cat_features] + [ n + "_scaled" for n in num_features]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [16]:
feature_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages.append(feature_assembler)
stages

[StringIndexer_5e82c1de92a4,
 OneHotEncoder_79630feee761,
 StringIndexer_373a484bce43,
 OneHotEncoder_ef46306b6083,
 StringIndexer_8a31a14b3d37,
 OneHotEncoder_4ad983ec6e38,
 VectorAssembler_bc1f54be5a52,
 StandardScaler_a87910c3d334,
 VectorAssembler_860825e843da,
 StandardScaler_a71d1ca121b5,
 VectorAssembler_73b19994c644,
 StandardScaler_ab9ca325122d,
 VectorAssembler_a0c530680826]

## Pipeline 구성
순서대로 구성된 Stage를 한꺼번에 수행할 파이프라인 생성

In [17]:
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)

## 데이터를 파이프라인에 통과시키기

In [20]:
# transformer의 fit : 변환을 하기 위한 수 또는 방법을 구하는 과정
fitted_transformer = pipeline.fit(train_sdf)

                                                                                

In [21]:
# transform : 데이터를 변환
vec_train_sdf = fitted_transformer.transform(train_sdf)
vec_train_sdf.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- features: vector (nullable 

In [22]:
vec_train_sdf.select("features", "total_amount").show(5) # (피처의 개수, [원핫인코딩된 번호


[Stage 22:>                                                         (0 + 1) / 1]

+--------------------+------------+
|            features|total_amount|
+--------------------+------------+
|(532,[62,311,526,...|         6.3|
|(532,[62,280,525,...|         8.8|
|(532,[62,280,527,...|         9.8|
|(532,[62,279,525,...|       10.55|
|(532,[62,298,522,...|        10.8|
+--------------------+------------+
only showing top 5 rows




                                                                                

## 모델 생성 및 훈련

In [25]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter = 50,
    solver = 'normal', # 최적화 방식
    labelCol = 'total_amount',
    featuresCol = 'features'
)

In [26]:
lr_model = lr.fit(vec_train_sdf)

23/11/20 14:25:15 WARN Instrumentation: [2ae0ccec] regParam is zero, which might cause numerical instability and overfitting.
23/11/20 14:25:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/20 14:25:45 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/11/20 14:27:36 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/11/20 14:27:36 WARN Instrumentation: [2ae0ccec] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/11/20 14:27:37 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/11/20 14:27:37 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

## 예측

In [27]:
# 파이프라인을 이용해 test_sdf 변환
vec_test_sdf = fitted_transformer.transform(test_sdf)

In [31]:
# vec_test_sdf로 예측
predictions = lr_model.transform(vec_test_sdf)

In [32]:
predictions.show()


[Stage 27:>                                                         (0 + 1) / 1]

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|            features|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------


                                                                                

In [33]:
# 예측한 결과를 따로 확인 할 때는 일반적으로 조회만 일어나기 때문에 캐시 처리를 해 주는 것이 좋다.
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, features: vector, prediction: double]

In [35]:
predictions.select("trip_distance","day_of_week","total_amount","prediction").show()


[Stage 28:>                                                         (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.7|   Saturday|       12.35|10.520957566446452|
|          1.5|     Friday|        11.8| 13.65652098094591|
|          1.0|     Friday|        12.3| 12.76767802677514|
|          1.7|   Saturday|        13.3|  13.8888089892157|
|          1.4|     Friday|         8.3|  9.67076737912389|
|          4.6|     Monday|        26.8|22.927184691166815|
|          0.7|  Wednesday|         5.3| 6.758567784586714|
|          1.7|    Tuesday|        11.8|10.683491846025056|
|          0.1|  Wednesday|        55.3|6.8158905129782985|
|          0.7|  Wednesday|        10.3|  9.48794145133163|
|          7.4|     Friday|        37.4| 33.81738044712699|
|          3.9|    Tuesday|       21.95|21.187066662973017|
|          4.1|     Monday|       22.55|21.755690286476216|
|          4.6|   Saturday|        24.8|


                                                                                

In [38]:
lr_model.summary.rootMeanSquaredError

3.2685055544721795

In [39]:
lr_model.summary.r2

0.794446430313654

In [40]:
spark.stop()