In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = '5g'
spark = SparkSession.builder.appName('taxi-fare-prediction')\
                    .config('spark.executor.memory', MAX_MEMORY)\
                    .config('spark.driver.memory', MAX_MEMORY)\
                    .getOrCreate()

22/07/19 20:30:27 WARN Utils: Your hostname, dongwoo.local resolves to a loopback address: 127.0.0.1; using 192.168.55.122 instead (on interface en0)
22/07/19 20:30:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/19 20:30:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 데이터프레임 생성

### 데이터 저장
- 추후 데이터를 가져오기 시간 단축을 위함

In [3]:
data_dir = "/Users/dongwoo/new_york/data"

In [4]:
# 데이터 불러오기
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [5]:
# 하이퍼 파라미터 튜닝을 위해 10%의 데이터만 가져옵니다.
toy_df = train_df.sample(False, 0.1, seed=1)

In [6]:
toy_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



# Data Preprocessing

### One Hot Encoding & String Indexer
> Categorical Data를 벡터화 합니다. 

- String Indexer는 `inputCol`컬럼을 인덱싱하여 `OutputCol` 컬럼을 새로 만드는 것이다.
- 레이블 빈도에 따라 정렬되므로 가장 빈번한 레이블은 0번째 인덱스를 갖는다.
- `.setHandleInvalid("keep")`은 변환 중 해당하지 않은 값이 들어올 떄 문제를 대처하는 역할을 한다. "keep", "error" or "skip"
- [참고자료](https://knight76.tistory.com/entry/spark-머신-러닝-StringIndexer-예)


`pickup_location_id` >> `pickup_location_id_index` >> **`pickup_location_id_onehot`**

In [7]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

for c in cat_feats:
    cat_indexer = StringIndexer(inputCol = c, outputCol = c + "_idex").setHandleInvalid("keep") 
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    
    stages += [cat_indexer, onehot_encoder] 

In [8]:
stages

[StringIndexer_44df6697e345,
 OneHotEncoder_c31928683ae0,
 StringIndexer_92938840dea6,
 OneHotEncoder_ca9a2360f294,
 StringIndexer_8ff6adf0c1d3,
 OneHotEncoder_762182f56127]

### Vector Assembler & StandardScaler
> Numerical Data를 벡터화 합니다.


`passenger_count` >> `passenger_count_vector` >> **`passenger_count_scaled`**

In [9]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    
    stages += [num_assembler, num_scaler]

In [10]:
stages

[StringIndexer_44df6697e345,
 OneHotEncoder_c31928683ae0,
 StringIndexer_92938840dea6,
 OneHotEncoder_ca9a2360f294,
 StringIndexer_8ff6adf0c1d3,
 OneHotEncoder_762182f56127,
 VectorAssembler_4a49fbfbbd81,
 StandardScaler_0dd48c80dec5,
 VectorAssembler_453d86af5849,
 StandardScaler_2394e557a8f6,
 VectorAssembler_714b219fe874,
 StandardScaler_333d1ca380af]

### 두 벡터화 된 컬럼을 다시 벡터화로 합친다.

`passenger_count_scaled` + `passenger_count_scaled` >> **`feature_vector`**

In [11]:
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [12]:
stages

[StringIndexer_44df6697e345,
 OneHotEncoder_c31928683ae0,
 StringIndexer_92938840dea6,
 OneHotEncoder_ca9a2360f294,
 StringIndexer_8ff6adf0c1d3,
 OneHotEncoder_762182f56127,
 VectorAssembler_4a49fbfbbd81,
 StandardScaler_0dd48c80dec5,
 VectorAssembler_453d86af5849,
 StandardScaler_2394e557a8f6,
 VectorAssembler_714b219fe874,
 StandardScaler_333d1ca380af,
 VectorAssembler_07b46ea70076]

## Hyper-Parameter Tuning

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder # CV, 그리드 형식 테스트
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(
    maxIter=30,
    solver='normal',
    labelCol='total_amount',
    featuresCol='feature_vector'
)

cv_stages = stages + [lr] 

# 파이프라인에 기존 stages + 모델을 넣습니다.
cv_pipe = Pipeline(stages=cv_stages)

In [14]:
param_grid = ParamGridBuilder()\
            .addGrid(lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.5])\
            .addGrid(lr.regParam, [0.01, 0.02, 0.03, 0.04, 0.05])\
            .build()

# 여러가지의 값들을 테스트 한다.

In [15]:
cross_val = CrossValidator(estimator=cv_pipe,
                          estimatorParamMaps=param_grid,
                          evaluator=RegressionEvaluator(labelCol="total_amount"),
                          numFolds=5 )
# numFolds : 데이터를 5개로 나눔

In [16]:
cv_model = cross_val.fit(toy_df)

22/07/19 20:30:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/07/19 20:30:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/07/19 20:31:01 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/07/19 20:31:01 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [17]:
alpha = cv_model.bestModel.stages[-1]._java_obj.getElasticNetParam() # 베스트 모델의 마지막 요소 추출

In [18]:
reg_param = cv_model.bestModel.stages[-1]._java_obj.getRegParam() # 베스트 모델의 마지막 요소 추출

In [20]:
import pandas as pd
hyperparam = {
    'alpha' : [alpha],
    'reg_param' : [reg_param]
}
hyper_df = pd.DataFrame(hyperparam).to_csv(f"{data_dir}hyperparameter.csv")
print(hyper_df)

None


## Training

In [36]:
from pyspark.ml import Pipeline

tf_stages = stages # Processing을 완료한 stages
pipe = Pipeline(stages=tf_stages) # 파이프라인을 구축

tf_fit = pipe.fit(train_df) # fit

                                                                                

In [37]:
v_train_df = tf_fit.transform(train_df) # transform

In [38]:
v_train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idex: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idex: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idex: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (

# Regression Modeling

In [39]:
from pyspark.ml.regression import LinearRegression

In [40]:
lr = LinearRegression(
    maxIter=5, # 반복수
    solver="normal",
    labelCol="total_amount", # Label(Traget)
    featuresCol='feature_vector', # 전처리 후 > feature -> feature_vector
    # 추가 #
    elasticNetParam = alpha,
    regParam = reg_param,
    )

In [41]:
model = lr.fit(v_train_df)

                                                                                

In [42]:
vtest_df = tf_fit.transform(test_df) # test 데이터는 fit을 하지 않습니다. 

In [43]:
prediction = model.transform(vtest_df)

In [44]:
prediction.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+-----------------------+-------------------------+------------------------+--------------------------+----------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idex|pickup_location_id_onehot|dropoff_location_id_idex|dropoff_location_id_onehot|day_of_week_idex|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+-----------------------+-------------------------+---------------

In [45]:
prediction.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idex: double, pickup_location_id_onehot: vector, dropoff_location_id_idex: double, dropoff_location_id_onehot: vector, day_of_week_idex: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [46]:
prediction.select(["trip_distance", "day_of_week","total_amount","prediction"]).show()

[Stage 3069:>                                                       (0 + 1) / 1]

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.5|  Wednesday|         7.8|11.641190505682586|
|          3.6|     Sunday|        19.3|20.540566626969913|
|          3.7|   Saturday|        18.8|20.131319716807788|
|          4.1|  Wednesday|        24.3| 21.42231124221469|
|          2.2|  Wednesday|        16.8|16.676868064645753|
|          2.6|     Monday|        17.8| 16.41476544059132|
|          6.3|     Sunday|       29.15| 25.01156542009558|
|          0.6|   Saturday|        9.95| 12.29694066590555|
|          0.8|  Wednesday|         9.3|13.180225723279165|
|          0.8|     Friday|        0.31|13.265223243052434|
|          1.0|  Wednesday|       11.75| 12.85238397446231|
|          0.6|     Monday|        9.95|11.859467517835556|
|          1.1|    Tuesday|       12.95|14.025801980241683|
|          1.1|  Wednesday|        11.3|

                                                                                

## Evaluation

In [47]:
model.summary.rootMeanSquaredError 

6.19539005933196

In [48]:
model.summary.r2

0.8093767003552972

- R2 : 80%가 나온것으로 봤을때, 지난 성능보다 향상됬다고 볼 수 있다.

# Model Save

In [49]:
model_dir = f"{data_dir}/model"
model.save(model_dir)

In [51]:
from pyspark.ml.regression import LinearRegressionModel

lr_model = LinearRegressionModel().load(model_dir)
prediction = lr_model.transform(vtest_df)
prediction.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+-----------------------+-------------------------+------------------------+--------------------------+----------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idex|pickup_location_id_onehot|dropoff_location_id_idex|dropoff_location_id_onehot|day_of_week_idex|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|      feature_vector|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+-----------------------+-------------------------+---------------