In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('taxi-fare-prediction').getOrCreate()

In [2]:
import os

cwd = os.getcwd()
trip_data_path = os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv')
trip_data_path

'/home/jovyan/work/learning_spark_data/trips/*.csv'

In [3]:
file_path = f"file:///{trip_data_path.replace(os.sep,'/')}"
file_path

'file:////home/jovyan/work/learning_spark_data/trips/*.csv'

In [4]:
trip_df = spark.read.csv(file_path, inferSchema = True, header = True)
trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# 운행거리에 따른 요금 예측

In [5]:
trip_df.createOrReplaceTempView('trips')

In [6]:
query = """
SELECT
    trip_distance,
    total_amount
FROM trips
WHERE total_amount < 5000
  AND total_amount > 0
  AND trip_distance > 0
  AND trip_distance < 500
  AND passenger_count < 4
  AND TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
  AND TO_DATE(tpep_pickup_datetime) < "2021-08-01"
"""

In [7]:
trip_df = spark.sql(query)
trip_df.createOrReplaceTempView('data')

In [8]:
# data table 결과 확인하기
spark.sql('select * from data limit 5').show()

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|         16.5|       70.07|
|         1.13|       11.16|
|         2.68|       18.59|
|         12.4|        43.8|
|          9.7|        32.3|
+-------------+------------+



In [15]:
train_data, test_data = trip_df.randomSplit([0.8, 0.2], seed = 1)

In [16]:
# train, test split 8:2, seed = 1

from pyspark.ml.feature import VectorAssembler

vassembler = VectorAssembler(inputCols=['trip_distance'], outputCol='features')
vtrain_df = vassembler.transform(train_data)
vtrain_df.show(5)

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|        3.05|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 5 rows



In [17]:
# vectorassembler > features: trip_distance, target: total_amount

In [18]:
# linearRegression 생성 maxIter = 50, LabelCol = 'total_amount', featuresCol = 'features'
# fit
# vassembler.transform(test)
# model.transform()

In [20]:
# linearRegression 생성 maxIter = 50, labelCol = 'total_amount', featuresCol = 'features'
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=50, labelCol='total_amount', featuresCol='features')
lr_model = lr.fit(vtrain_df) # 1000만건 데이터 학습

vtest_df = vassembler.transform(test_data)
pred = lr_model.transform(vtest_df) # 200만건 데이터 학습

In [21]:
pred.show(5)

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|9.225382465769695|
|         0.01|         3.3|  [0.01]|9.225382465769695|
|         0.01|         3.3|  [0.01]|9.225382465769695|
|         0.01|         3.3|  [0.01]|9.225382465769695|
|         0.01|         3.3|  [0.01]|9.225382465769695|
+-------------+------------+--------+-----------------+
only showing top 5 rows



In [23]:
lr_model.summary.rootMeanSquaredError #RMSE

5.932274383538252

In [24]:
lr_model.summary.r2

0.7706517292012338

In [None]:
# 새로운 데이터로 예측하기

In [29]:
from pyspark.sql.types import DoubleType

new_distance_list = [1.1, 5.4, 10.2, 30.0]
distance_df = spark.createDataFrame(new_distance_list, DoubleType()).toDF('trip_distance')
distance_df.show()

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.4|
|         10.2|
|         30.0|
+-------------+



In [30]:
vdistance_df = vassembler.transform(distance_df)
lr_model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]| 12.49984231215256|
|          5.4|   [5.4]|25.417436201552853|
|         10.2|  [10.2]|  39.8370758920462|
|         30.0|  [30.0]| 99.31808961533127|
+-------------+--------+------------------+

