In [2]:
from pyspark.sql import SparkSession

MAX_MEMORY = '8g'

spark = SparkSession.builder.appName('taxi-fare-prediction_2nd') \
    .config('spark.driver.memory', MAX_MEMORY) \
    .config('spark.executor.memory', MAX_MEMORY) \
    .getOrCreate()

In [5]:
import os
cwd = os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data','tips_data','trips','*.csv')
file_path = f"file:///{trip_data_path.replace(os.sep,'/')}"
trip_df = spark.read.csv(file_path, inferSchema = True, header = True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
trip_df.createOrReplaceTempView('trips')

In [40]:
quit = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

data_df = spark.sql(quit)

In [41]:
# train, test split 8:2, seed = 1
train_data, test_data = data_df.randomSplit([0.8,0.2], seed=1)
train_data.show(5), test_data.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|                 4|                  4|          0.1|         18|   Saturday|         6.3|
|              0|                 4|                 79|          0.7|         12|    Tuesday|         8.8|
|              0|                 4|                 79|          0.7|         23|   Saturday|       12.35|
|              0|                 4|                 79|          0.9|         14|     Monday|         9.8|
|              0|                 4|                114|          0.9|         18|     Friday|        10.8|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows

+--

(None, None)

##  파이프라인 생성e'

- 전처리 과정을 각 스테이지로 정의해서 쌓는다

- 범주형> StringIndexer+onehotencoding : 'pickup_location_id', 'dropoff_location_id', 'day_of_week'

- 수치형> StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [42]:
stages = []

In [43]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
cat_feature = ['pickup_location_id','dropoff_location_id','day_of_week']
for cat in cat_feature:
    cat_index = StringIndexer(inputCol = cat, outputCol = cat+'-_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols = [cat_index.getOutputCol()], # _idx col
                                 outputCols = [cat+'_onehot']) # postfix
    stages += [cat_index, onehot_encode] # colllst
stages

[StringIndexer_239b1deb77f9,
 OneHotEncoder_3cd2c14e0653,
 StringIndexer_b1527da63a22,
 OneHotEncoder_80e39f031968,
 StringIndexer_7453cbcbe345,
 OneHotEncoder_b2b50fac761c]

In [44]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_feature = ['passenger_count', 'trip_distance', 'pickup_time']
for num in num_feature:
    num_assembler = VectorAssembler(inputCols=[num], outputCol=num + '_vector')
    num_scaler = StandardScaler(inputCol = num_assembler.getOutputCol(), outputCol= num +'_scaled')
    stages += [num_assembler, num_scaler]
stages

[StringIndexer_239b1deb77f9,
 OneHotEncoder_3cd2c14e0653,
 StringIndexer_b1527da63a22,
 OneHotEncoder_80e39f031968,
 StringIndexer_7453cbcbe345,
 OneHotEncoder_b2b50fac761c,
 VectorAssembler_18858a5b4b34,
 StandardScaler_55b2705a8ebd,
 VectorAssembler_a2e95555f66b,
 StandardScaler_4b68b955eee0,
 VectorAssembler_49950151b313,
 StandardScaler_d82ef85a9987]

In [45]:
assembler_input = [cat + '_onehot' for cat in cat_feature] + [num + '_scaled' for num in num_feature]
assembler_input

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [46]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages += [assembler]
stages

[StringIndexer_239b1deb77f9,
 OneHotEncoder_3cd2c14e0653,
 StringIndexer_b1527da63a22,
 OneHotEncoder_80e39f031968,
 StringIndexer_7453cbcbe345,
 OneHotEncoder_b2b50fac761c,
 VectorAssembler_18858a5b4b34,
 StandardScaler_55b2705a8ebd,
 VectorAssembler_a2e95555f66b,
 StandardScaler_4b68b955eee0,
 VectorAssembler_49950151b313,
 StandardScaler_d82ef85a9987,
 VectorAssembler_f9a62bc18286]

In [47]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_data)
vtrain_df = fitted_transform.transform(train_data)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id-_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id-_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week-_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (

In [51]:
vtrain_df.select('feature_vector').show(2)

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
+--------------------+
only showing top 2 rows



In [52]:
# linearRegression 생성 maxIter = 50, labelCol = 'total_amount',featuresCol = 'features'
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=50, solver='normal'
                      ,labelCol='total_amount', featuresCol='feature_vector')
lr_model = lr.fit(vtrain_df)

In [54]:
# 테스트 데이터 변환
vtest_df = fitted_transform.transform(test_data)

# 테스트 데이터로 예측
pred = lr_model.transform(vtest_df)

In [55]:
pred.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id-_idx: double, pickup_location_id_onehot: vector, dropoff_location_id-_idx: double, dropoff_location_id_onehot: vector, day_of_week-_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [56]:
pred.select('total_amount','prediction').show(3)

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|       10.55|12.695522792729275|
|        13.3|14.450558014776915|
|        21.3|21.108271361254218|
+------------+------------------+
only showing top 3 rows



In [58]:
lr_model.summary.r2, lr_model.summary.rootMeanSquaredError

(0.80849012500813, 5.6485201652667625)

In [59]:
spark.stop()