In [1]:
from pyspark.sql import SparkSession

MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()

In [2]:
directory="C:\\Users\\daesi\\Downloads\\빅데이터 소스코드\\소스코드\\study_spark\data"
trip_files = "\\trips\\*"

In [3]:
trips_df = spark.read.csv(f"file:///{directory}\\{trip_files}", inferSchema=True, header=True)
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [4]:
trips_df.createOrReplaceTempView("trips")

운행 거리(`trip distance`)에 따른 요금(`total amount`)를 예측하는 회귀 모델을 생성

In [5]:
query = """
SELECT
    trip_distance,
    total_amount
FROM trips

WHERE total_amount < 5000
  AND total_amount > 0
  AND trip_distance > 0
  AND trip_distance < 500
  AND passenger_count < 4
  AND TO_DATE(tpep_pickup_datetime) >= "2021-01-01"
  AND TO_DATE(tpep_pickup_datetime) < "2021-08-01"
"""

data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [6]:
data_df.show(5)

+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|         16.5|       70.07|
|         1.13|       11.16|
|         2.68|       18.59|
|         12.4|        43.8|
|          9.7|        32.3|
+-------------+------------+
only showing top 5 rows



trip_distance는 X, total_amount는 Y

머신 러닝 모델 y=f(x)를 이용해서 예측 > 회귀분석
1. train/test로 데이터 셋 split
2. x값을 벡터화

In [7]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

In [8]:
train_df.count(), test_df.count()

(10500253, 2625787)

1. 하나 이상의 독립 변수를 하나의 벡터로 묶는 것 > Feature Vecotrization
    - x Feature의 형식을 1차원 배열로 (벡터)로 바꿔야한다.
    - vectorAssembler 사용
    - 이것을 묶어서 2차원 형태로 만든다 > 벡터 어셈블러

`feature`는 `vector` 형태로 존재해야 하기 때문에 1차원 배열로 만들어 주는 `VectorAssembler` 사용

In [9]:
from pyspark.ml.feature import VectorAssembler


# inputCols에 지정된 컬럼의 데이터들을 1차원 배열 형식으로 묶어서
# outputCol에 지정된 컬럼의 이름으로 새로운 컬럼을 생성


vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
vtrain_df  = vassembler.transform(train_df)

vtrain_df.show()

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|        3.05|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 20 rows



# 모델 생성 및 훈련

In [10]:
from pyspark.ml.regression import LinearRegression

In [14]:
lr = LinearRegression(
    maxIter=50,  #훈련횟수
    labelCol = "total_amount", # 훈련할 컬럼, 종속변수
    featuresCol = "features"  # 특성 컬럼 이름 지정, 독립변수
)

In [13]:
#선형회귀 모델 학습
# 학습용 벡터 어셈블러 객체 > 데이터

model = lr.fit(vtrain_df)

In [15]:
# 테스트 데이터도 vector assemble 형식으로 변환.
#  테스트 데이터 세트를 위해서 Transformer를 새로 마세요!!! 반드시 훈련 데이터 세트에서 사용했던 Transformer를 사용
vtest_df = vassembler.transform(test_df)
vtest_df.show(5)

+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 5 rows



In [19]:
#테스트 데이터로 예측하기(transform)

test_predictions = model.transform(vtest_df)
test_predictions.show()

+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745

prediction 과 total_amount 값을 비교 > 차이가 많이 난다.

--실제로 오차를 확인하자--

In [21]:
# RMSE
model.summary.rootMeanSquaredError

6.30781413196623

In [22]:
# R^2
model.summary.r2

0.7648633777017714

In [23]:
#실제 데이터를 만들어서 예측하기
from pyspark.sql.types import DoubleType
distance_list = [1.1, 5.5, 10.5, 3.0]
distance_df   = spark.createDataFrame(distance_list, DoubleType()).toDF("trip_distance")

distance_df.show(5)

+-------------+
|trip_distance|
+-------------+
|          1.1|
|          5.5|
|         10.5|
|          3.0|
+-------------+



In [24]:
vdistance_df = vassembler.transform(distance_df)
vdistance_df.show()

+-------------+--------+
|trip_distance|features|
+-------------+--------+
|          1.1|   [1.1]|
|          5.5|   [5.5]|
|         10.5|  [10.5]|
|          3.0|   [3.0]|
+-------------+--------+



In [25]:
model.transform(vdistance_df).show()

+-------------+--------+------------------+
|trip_distance|features|        prediction|
+-------------+--------+------------------+
|          1.1|   [1.1]|12.672809485363317|
|          5.5|   [5.5]|25.761270454374163|
|         10.5|  [10.5]| 40.63452155552285|
|          3.0|   [3.0]|18.324644903799822|
+-------------+--------+------------------+



# 두번째 모델\

feature를 늘려서 예측, 데이터 종류 증가
출발지, 도착지, 소요시간, 요일추가

In [27]:

query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""

data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [28]:
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [29]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)

# 파이프라인 생성

In [31]:
stages = []

`One Hot Encode` stage
- `pickup_location_id`
- `dropoff_location_id`
- `day_of_week`

위 세 `feature`는 범주형 (category)

In [32]:
# StringIndexer : String 값을 Integer로 바꿔준다.
# OneHotEncoder : StringIndexer에 의해 정수가 된 값을 OneHotEncoding을 시켜준다.

from pyspark.ml.feature import OneHotEncoder, StringIndexer

# OHE 할 컬럼 지정
cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

for c in cat_features:
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c + "_onehot"])
    stages += [cat_indexer, onehot_encoder]

stages

[StringIndexer_c84341797d30,
 OneHotEncoder_152c7e2d1754,
 StringIndexer_1d3da27504af,
 OneHotEncoder_97c8d500de16,
 StringIndexer_8c9764df0816,
 OneHotEncoder_91674f9f33aa]

`Numerical Data` Preprocessing stage > 수치형 데이터 전처리

대상 컬럼
- `passenger_count`
- `trip_distance`
- `pickup_time`

standard scaler > 표준화 > 아웃라이어의 보정이 가능하다


In [33]:
# 각 컬럼의 데이터를 벡터화 시키고, 표준화 StandardScaler를 수행한다.
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_features:
    num_assembler = VectorAssembler(inputCols=[n], outputCol = n + "_vector")
    num_scaler    = StandardScaler(inputCol = num_assembler.getOutputCol(), outputCol= n+"_scaled")
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_c84341797d30,
 OneHotEncoder_152c7e2d1754,
 StringIndexer_1d3da27504af,
 OneHotEncoder_97c8d500de16,
 StringIndexer_8c9764df0816,
 OneHotEncoder_91674f9f33aa,
 VectorAssembler_c22685fb1b94,
 StandardScaler_47c63580ab93,
 VectorAssembler_318e938986f8,
 StandardScaler_8a0350a6bdc5,
 VectorAssembler_162c379d2f86,
 StandardScaler_7c3b5203e874]

category, numeric 형식으로 각각 작업된 벡터 결과물들을 하나로 합쳐주기 ( `VectorAssembler` )


In [34]:
# _onehot이 붙은 컬럼과 _scaled가 붙은 컬럼만 있으면 된다.
assembler_inputs = [c + "_onehot" for c in cat_features ] + [ n + "_scaled" for n in num_features ]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [35]:
assembler = VectorAssembler(inputCols= assembler_inputs, outputCol="feature_vector")
stages += [assembler]
stages

[StringIndexer_c84341797d30,
 OneHotEncoder_152c7e2d1754,
 StringIndexer_1d3da27504af,
 OneHotEncoder_97c8d500de16,
 StringIndexer_8c9764df0816,
 OneHotEncoder_91674f9f33aa,
 VectorAssembler_c22685fb1b94,
 StandardScaler_47c63580ab93,
 VectorAssembler_318e938986f8,
 StandardScaler_8a0350a6bdc5,
 VectorAssembler_162c379d2f86,
 StandardScaler_7c3b5203e874,
 VectorAssembler_e02b03865123]

# 파이프라인 생성

In [39]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
fitted_transformer = pipeline.fit(train_df)

In [40]:
# transform 을 이용해 데이터 변환
vtrain_df = fitted_transformer.transform(train_df)
vtrain_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [38]:
vtrain_df.select("feature_vector").show()

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
|(533,[62,280,527,...|
|(533,[62,280,528,...|
|(533,[62,299,523,...|
|(533,[62,288,523,...|
|(533,[62,264,526,...|
|(533,[62,301,523,...|
|(533,[62,301,523,...|
|(533,[62,282,527,...|
|(533,[62,275,529,...|
|(533,[62,293,527,...|
|(533,[62,284,529,...|
|(533,[63,319,525,...|
|(533,[63,319,524,...|
|(533,[63,319,523,...|
|(533,[63,339,526,...|
|(533,[63,276,524,...|
|(533,[63,355,529,...|
|(533,[63,292,528,...|
+--------------------+
only showing top 20 rows



**모델 생성**
`VectorAssembler`를 이용해 `feature`들이 모여있는 `feature_vector` 컬럼을 지정

In [41]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver="normal",
    labelCol="total_amount",
    featuresCol="feature_vector"
)

In [42]:
model = lr.fit(vtrain_df)

In [43]:
vtest_df = fitted_transformer.transform(test_df)

In [44]:
predictions = model.transform(vtest_df)

In [45]:
# 예측 결과물은 바뀌지 않기 때문에 cache로 지정해서 메모리를 낭비하지 않도록 하는 것이 좋다.
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, feature_vector: vector, prediction: double]

In [46]:
predictions.select(["trip_distance", "day_of_week", "total_amount", "prediction"]).show()

+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          1.0|    Tuesday|       10.55|12.695522792729275|
|          1.7|   Saturday|        13.3| 14.45055801477692|
|          4.1|     Friday|        21.3|21.108271361254214|
|         11.5|     Sunday|        41.3| 40.87993984204375|
|          1.7|   Saturday|       14.15| 13.90693298261399|
|          0.7|  Wednesday|         5.8|  9.62248222618894|
|          5.0|  Wednesday|        24.3|21.147909957146926|
|          1.5|   Thursday|         8.8| 9.969750900636763|
|         13.4|     Monday|       66.35| 62.65097273030503|
|         15.0|     Monday|       70.67| 66.37330532523579|
|         14.2|  Wednesday|       85.65| 89.80098581271078|
|          0.1|  Wednesday|        55.3|12.483544948638677|
|          3.9|    Tuesday|       21.95|23.136774384461823|
|          4.7|   Thursday|        27.8|

In [47]:
model.summary.rootMeanSquaredError

5.6485201652667625

In [48]:
model.summary.r2

0.80849012500813

In [49]:
spark.stop()