In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("taxi-fare-prediciton_2nd")\
                            .config("spark.driver.memory", '8g')\
                            .config("spark.excutor.memory", '8g')\
                            .getOrCreate()

In [3]:
import os
cwd = os.getcwd()
trip_data_path = os.path.join(cwd, '../learning_spark_data', 'trips', '*.csv')
file_path = f"file:///{trip_data_path.replace(os.sep,'/') }"
trip_df = spark.read.csv(file_path, inferSchema=True, header=True)


In [4]:
trip_df.createOrReplaceTempView('trips')

In [7]:
trip_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2021-03-01 00:22:02|  2021-03-01 00:23:22|              1|          0.0|         1|                 N|         264|         264|           2|        3.0|  0.5|    0.5|       0.0|         0.0|                  0.3

In [8]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

In [11]:
data_df = spark.sql(query)

In [15]:
def learning(machine, data, param, translate=None, Tparam=None):
    train_data, test_data = data.randomSplit([0.7,0.3], seed=12)

    if translate is not None:
        translated = translate(**Vparam)
        train_data = translated.transform(train_data)
        test_data = translated.transform(test_data)
    
    learning = machine(**param)
    model = learning.fit(train_data)
    
    predic = model.transform(test_data)
    
    print(f'rootMeanSquaredError: {model.summary.rootMeanSquaredError}')
    print(f"r2: {model.summary.r2}")
    predic.show()

In [None]:
learning(data_df)

In [16]:
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=1)

In [17]:
stages = []

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week']
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx').setHandleInvalid('keep')
    onehot_encode = OneHotEncoder(inputCols= [cat_index.getOutputCol()], #_idx col
                                  outputCols=[cat+'_onehot'] #postfix
                                 )
    stages += [cat_index, onehot_encode ] #collist
stages

[StringIndexer_f9ba487d6676,
 OneHotEncoder_99d0c03cfd4b,
 StringIndexer_110297b55d65,
 OneHotEncoder_45282655e031,
 StringIndexer_41651a78f423,
 OneHotEncoder_46632d853287]

In [19]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_features = [ 'passenger_count', 'trip_distance', 'pickup_time']

for num in num_features:
    num_assembler = VectorAssembler(inputCols=[num], outputCol=num+'_vector')
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=num+'_scaled')
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_f9ba487d6676,
 OneHotEncoder_99d0c03cfd4b,
 StringIndexer_110297b55d65,
 OneHotEncoder_45282655e031,
 StringIndexer_41651a78f423,
 OneHotEncoder_46632d853287,
 VectorAssembler_9ea5b1c33373,
 StandardScaler_da60ca07bfcf,
 VectorAssembler_1597c1226205,
 StandardScaler_23d4f0edc045,
 VectorAssembler_5cd4fe6c20bd,
 StandardScaler_9228aaa8aa72]

In [20]:
assembler_input = [cat+'_onehot' for cat in cat_features] + [num+'_scaled' for num in num_features]
assembler_input

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [21]:
assembler = VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages += [assembler]
stages

[StringIndexer_f9ba487d6676,
 OneHotEncoder_99d0c03cfd4b,
 StringIndexer_110297b55d65,
 OneHotEncoder_45282655e031,
 StringIndexer_41651a78f423,
 OneHotEncoder_46632d853287,
 VectorAssembler_9ea5b1c33373,
 StandardScaler_da60ca07bfcf,
 VectorAssembler_1597c1226205,
 StandardScaler_23d4f0edc045,
 VectorAssembler_5cd4fe6c20bd,
 StandardScaler_9228aaa8aa72,
 VectorAssembler_818a77af4b7b]

In [22]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
fitted_transform = pipeline.fit(train_df)
vtrain_df = fitted_transform.transform(train_df)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [23]:
vtrain_df.select('feature_vector').show(2)

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
+--------------------+
only showing top 2 rows



In [24]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=50, solver='normal', 
                 labelCol='total_amount', featuresCol='feature_vector')

In [25]:
model = lr.fit(vtrain_df)

In [26]:
#테스트데이터도 변환
vtest_df = fitted_transform.transform(test_df)
#테스트데이터로 예측
pred = model.transform(vtest_df)

In [None]:
pred.select('total_amount', 'prediction').show(3)

In [None]:
model.summary.r2, model.summary.rootMeanSquaredError

In [None]:
spark.stop()