In [48]:
from pyspark.sql import SparkSession

In [49]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("taxi-duration-prediction-2")\
            .config("spark.executor.memory", MAX_MEMORY)\
            .config("spark.driver.memory", MAX_MEMORY).getOrCreate()

In [50]:
trip_files = "/home/jovyan/trips/*"

In [51]:
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema=True, header=True)

In [52]:
trips_df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)



In [53]:
trips_df.createOrReplaceTempView("trips")

In [54]:
query = """
SELECT
    CAST(passenger_count AS double),
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    CAST(trip_distance AS double),
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    CAST(total_amount AS double)
FROM
    (SELECT
        *,
        TO_DATE(t.tpep_pickup_datetime) AS pickup_date
    FROM
        trips t)
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND pickup_date >= '2021-01-01'
    AND pickup_date < '2021-04-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [55]:
data_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [56]:
data_df.select(["passenger_count", "trip_distance", "pickup_time", "day_of_week"]).describe().show()

+-------+------------------+------------------+------------------+-----------+
|summary|   passenger_count|     trip_distance|       pickup_time|day_of_week|
+-------+------------------+------------------+------------------+-----------+
|  count|          13180789|          13180789|          13180789|   13180789|
|   mean|1.2169496074931478|3.5256582189433003|14.207635976875132|       NULL|
| stddev|0.5477227418517697| 4.174284300086649|  5.27160400823842|       NULL|
|    min|               0.0|               1.0|                 0|     Friday|
|    max|               3.0|             475.5|                23|  Wednesday|
+-------+------------------+------------------+------------------+-----------+



### Train, Test 데이터셋 만들기

In [57]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)
print(train_df.count())
print(test_df.count())

10542976
2637813


In [58]:
toy_df = train_df.sample(False, .1, seed=261)

In [59]:
# 컬럼 기반 포멧 parquet로 저장.. 압축률이 좋고 disk io가 적다 컬럼별로 적절한 인코딩이 가능
data_dir = "/home/jovyan/data"
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")
toy_df.write.format("parquet").save(f"{data_dir}/toy/")

In [60]:
# 다시 읽어오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")
toy_df = spark.read.parquet(f"{data_dir}/toy/")

In [61]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: string (nullable = true)
 |-- dropoff_location_id: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



### 전처리

In [62]:
# One-Hot Encoding
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# 카테고리 피쳐들
cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week",
    "pickup_time",
]

# 파이프라인 스테이지
stages = []

# 카테고리 피쳐 프리프로세싱
for c in cat_feats:
    # c -> c_idx
    cat_indexer = StringIndexer(inputCol=c, outputCol = c + "_idx").setHandleInvalid("keep")
    # one hot encode 
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [63]:
# Feature Normalization
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Numerical 피쳐
num_feats = [
    "passenger_count",
    "trip_distance"
]

# vector assembler
for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled")
    stages += [num_assembler, num_scaler]

In [64]:
stages   # 카테고리 + numerical 피쳐별로 6세트가 생김.

[StringIndexer_961e6163f71c,
 OneHotEncoder_3a600812d9a6,
 StringIndexer_168ae784ed5a,
 OneHotEncoder_f7c7abb4d309,
 StringIndexer_3db75fc0edee,
 OneHotEncoder_300724997c9f,
 StringIndexer_a2bb0d84dec8,
 OneHotEncoder_3bc0588093db,
 VectorAssembler_22f3cc35f25a,
 StandardScaler_113cb516db41,
 VectorAssembler_3966ef57e6dd,
 StandardScaler_832150718971]

In [65]:
# Categorical + Numeric features
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [66]:
from pyspark.ml import Pipeline
# add model into the stages
transform_stages = stages

# Construct pipeline using the set of stages defined
pipeline =  Pipeline(stages=transform_stages)

# Fit the transformer
fitted_transformer = pipeline.fit(train_df)

In [67]:
# Transform the train data
transformed_train_df = fitted_transformer.transform(train_df)
# transformed_train_df = transformed_train_df.cache()

In [68]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=100, 
                         solver="normal", 
                         labelCol="total_amount",
                         featuresCol="feature_vector",
                        )

In [69]:
model = lr.fit(transformed_train_df)

In [72]:
from pyspark.ml.evaluation import RegressionEvaluator

In [73]:
transformed_test_df = fitted_transformer.transform(test_df)

In [74]:
predictions = model.transform(transformed_test_df).cache()

In [79]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|         14.9|     Friday|       66.35| 68.64659764171763|
|         16.2|   Thursday|       60.35| 70.75679896277349|
|          2.4|   Thursday|        12.8|18.133372777694163|
|          3.0|   Thursday|       18.75|19.597865666447444|
|          1.0|   Thursday|        11.8|13.789339998675738|
|          1.5|     Friday|       14.08|14.844348293357143|
|          1.6|     Friday|       13.55|14.312911841429377|
|          1.9|    Tuesday|        18.5|14.908396038896326|
|          2.2|     Monday|        15.3|15.328428877262382|
|          2.0|   Thursday|       17.15|  16.3342360650431|
|          2.1|     Monday|        16.3| 17.44294965203698|
|          2.7|   Saturday|        18.3| 18.79501683218274|
|          4.3|     Monday|        20.8| 21.96263443893573|
|          3.4|   Saturday|       22.85|

In [None]:
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 30.0]
distances_df = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

In [None]:
distances_df.show()

In [None]:
vdistances_df = vassembler.transform(distances_df)

In [None]:
vdistances_df.show()

In [None]:
model.transform(vdistances_df).show()

# 성능 평가

In [80]:
model.summary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0xffff47808050>

In [81]:
print("RMSE: ", model.summary.rootMeanSquaredError)

RMSE:  6.119912904998628


In [82]:
print("R2: ", model.summary.r2)

R2:  0.8069741997402999


In [83]:
train_df.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+-----------+------------------+
|summary|   passenger_count|pickup_location_id|dropoff_location_id|     trip_distance|       pickup_time|day_of_week|      total_amount|
+-------+------------------+------------------+-------------------+------------------+------------------+-----------+------------------+
|  count|          10542976|          10542976|           10542976|          10542976|          10542976|   10542976|          10542976|
|   mean|1.2169354269610402|164.29842295002854| 161.28775262316825| 3.526571795286053|14.207018018441852|       NULL|20.265381998326152|
| stddev|0.5477710193637441| 65.58302058496625|   71.5275341911363|4.1746303495386075| 5.271293246375481|       NULL|13.929565868586424|
|    min|               0.0|                 1|                  1|               1.0|                 0|     Friday|               1.0|
|    max|               3.0|             