<a href="https://colab.research.google.com/github/HaJunYoo/Pyspark-tutorial/blob/main/taxi_fare_prediction_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5 
!pip install -q findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=520b30170fe382a5b7536bf556a1101820acc858fe81d41202917bb0fff2f2eb
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### read data

In [8]:
trip_files = "/content/drive/MyDrive/Spark/01-spark/data/trips/*"

In [4]:
from pyspark.sql import SparkSession

In [5]:
MAX_MEMORY="8g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

In [9]:
trips_df = spark.read.csv(trip_files, inferSchema=True, header=True)

In [10]:
trips_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [11]:
trips_df.createOrReplaceTempView("trips")

In [13]:
# 더 많은 피쳐들을 가지고 와서 prediction 진행
# DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') => 날짜를 통해 요일 추출 
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [14]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|            1.0|               142|                 43|          2.1|          0|     Friday|        11.8|
|            1.0|               238|                151|          0.2|          0|     Friday|         4.3|
|            1.0|               132|                165|         14.7|          0|     Friday|       51.95|
|            0.0|               138|                132|         10.6|          0|     Friday|       36.35|
|            1.0|                68|                 33|         4.94|          0|     Friday|       24.36|
|            1.0|               224|                 68|          1.6|          0|     Friday|       14.15|
|            1.0|           

In [None]:
data_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



In [15]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

In [16]:
data_dir = "/content/drive/MyDrive/Spark/01-spark/data"

In [17]:
# 조금 더 빠른 로딩을 위해 디렉토리에 저장
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")

In [18]:
# parquet ==> 압축률이 좋고 disk io가 적은 장점 존재
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

In [19]:
train_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



### preprocessing

- Stage 구성
  - OneHotEncoder, StringIndexer
  - VectorAssembler, StandardScaler
  - Assembler : one hot과 numerical scaled 피쳐를 합침 -> VectorAssembler 사용

In [31]:
## categorical data preprocessing

from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_feats = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

stages = []

# string indexer -> onehot encoder
for c in cat_feats:
    cat_indexer = StringIndexer(inputCol=c, outputCol= c + "_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

In [32]:
stages

[StringIndexer_22646e6da988,
 OneHotEncoder_4a7a8bc23483,
 StringIndexer_bdb08158c9df,
 OneHotEncoder_d29098fb986c,
 StringIndexer_aeb6af8e8146,
 OneHotEncoder_eafb8ce21829]

In [33]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_feats = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

# vectorAssembler -> Standardscaler
for n in num_feats:
    num_assembler = VectorAssembler(inputCols=[n], outputCol= n + "_vector")
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol= n + "_scaled")
    stages += [num_assembler, num_scaler]

In [34]:
# one hot과 numerical scaled 피쳐를 합침 -> VectorAssembler 사용 -> 피쳐 생성
assembler_inputs = [c + "_onehot" for c in cat_feats] + [n + "_scaled" for n in num_feats]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="feature_vector")
stages += [assembler]

In [35]:
from pyspark.ml import Pipeline

transform_stages = stages
pipeline = Pipeline(stages=transform_stages)
fitted_transformer = pipeline.fit(train_df)

In [36]:
vtrain_df = fitted_transformer.transform(train_df)

In [37]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

In [38]:
vtrain_df.printSchema()

root
 |-- passenger_count: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (null

In [39]:
model = lr.fit(vtrain_df)

In [40]:
vtest_df = fitted_transformer.transform(test_df)

In [41]:
predictions = model.transform(vtest_df)

In [42]:
predictions.cache()

DataFrame[passenger_count: double, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [43]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          0.7|   Saturday|       12.35|12.505664801918698|
|          3.1|    Tuesday|        18.0|17.818332946356612|
|          2.1|   Saturday|       15.35|16.768456645488012|
|          1.7|   Saturday|        13.3|14.315974459478376|
|          4.1|     Friday|        21.3|20.954315116906884|
|          1.4|     Friday|         8.3|12.025221364848148|
|          7.3|    Tuesday|        29.3|28.044553383935828|
|          0.7|  Wednesday|         5.8| 9.704306924150398|
|          5.0|  Wednesday|        24.3| 21.17150117069628|
|          6.7|   Saturday|        29.8|37.502803118269156|
|         16.8|     Friday|       82.37| 71.32483145001771|
|         29.3|     Monday|        80.8|103.55970558581402|
|          4.1|     Friday|        20.8|22.258510945068004|
|          0.1|  Wednesday|        55.3|

In [44]:
model.summary.rootMeanSquaredError

5.848484065033885

In [45]:
model.summary.r2

0.7969874815768958