In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

directory="/home/ubuntu/working/spark-examples/data/"
trip_files="trips/*"

trips_df = spark.read.csv(f"file://{directory}/{trip_files}", inferSchema=True, header=True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/03 20:34:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
trips_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [3]:
trips_df.createOrReplaceTempView("trips")

달라지는 점
- v1에서 만든 모델은 데이터의 종류가 너무 적어서 예측 성능이 좋지 못해 과소적합이 발생
- 데이터의 종류를 늘리는 다항 회귀를 활용
    - $\hat{y} =w1x1+w1x2 +...+b =WX + b$

In [4]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [5]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
|              1|               138|                141|          9.3|          0|     Monday|       43.67|
|              1|           

In [6]:
train_df, test_df = data_df.randomSplit([0.8,0.2], seed=42)

지금처럼 데이터의 양이 너무나 많고, 그 데이터에 대한 전처리를 수행 했음.
- 시간이 매우 많이 걸리는 배치 작업
- 추후에 다시 이 데이터를 활용한다면 다시 처음부터 전처리 하는데 시간이 많이 걸린다.
- 어떻게 전처리가 된 데이터를 파일이나 데이터 베이스에 저장 해놓고, 나중에 다시 불러오는 것이 시간적으로 이득

In [8]:
# 파케이(parquet) 형식으로 데이터 마트를 저장
data_dir = "/home/ubuntu/working/spark-examples/data/ml_data_taxi" # 로컬 디렉토리로 지정. hdfs를 사용 할 수도 있다!
# hdfs_dir = "hdfs://user/ubuntu/spark-taxi-data"

# Spark Dataframe의 write를 이용해서 데이터를 파일 또는 DB에 저장이 가능
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")

                                                                                

In [9]:
# 분산 저장된 데이터를 불러오기
# 불러올 때 파일을 지정하는게 아니고 디렉토리를 지정하면 된다
train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

In [10]:
train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



# 파이프라인 구성

In [11]:
# Pipeline에 넣을 과정(stage)를 하나씩 넣어 놓을 리스트 선언
stages = []

## 1. OneHotEncoding Stage
- `pickup_location_id`
- `ropoff_location_id`
- `day_of_week`

`pickup_location_id`, `dropoff_location_id`는 숫자 형식의 데이터!
- 숫자 형식의 데이터는 OneHotEncoding이 되지 않는다.
- 따라서 숫자 형식의 카테고리 데이터를 임시로 문자열로 처리하기 위히 `StringIndexer` 트랜스포머 활용

In [14]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# OneHotEncoding을 수행할 컬럼을 지정

cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

for c in cat_features:
    # 1. 데이터를 문자여열 형식으로 바꿔준다.
    # setHandleInvalid : Null 값 같은 데이터를 어떻게 처리 할지.
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    # 2. OneHotEncoding
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c+"_onehot"]) # 여러개의 입력을 받을 수 있지만(cols) 이번에는 하나씩 처리
    stages +=[cat_indexer, onehot_encoder]
stages # 뒤에 붙는 값은 메모리 주소값

[StringIndexer_f30a153ac1ea,
 OneHotEncoder_66d2738eb855,
 StringIndexer_906081025d1e,
 OneHotEncoder_dd3ba8cdd6e5,
 StringIndexer_5e918472a0b9,
 OneHotEncoder_b0c3e9cd4d7b]

## 2. StandardScaler & VectorAssembler Stage
- `passenger_count`
- `trip_distance`
- `pickup_time`

기본적으로 스케일링 작업은 스칼라 값이 아닌, 벡터 단위로 스케일링이 일어나야 한다.

In [16]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance", 
    "pickup_time"
]

for n in num_features:
    # 1. 각각의 컬럼의 데이터를 벡터화. ex) 1.5 -> [1.5]
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")

    # 2. StandardScaling 수행
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled")

    stages += [num_assembler, num_scaler]

stages

[StringIndexer_f30a153ac1ea,
 OneHotEncoder_66d2738eb855,
 StringIndexer_906081025d1e,
 OneHotEncoder_dd3ba8cdd6e5,
 StringIndexer_5e918472a0b9,
 OneHotEncoder_b0c3e9cd4d7b,
 VectorAssembler_d2eab73faa90,
 StandardScaler_903a2da27230,
 VectorAssembler_af041fc15bd8,
 StandardScaler_b38efc26c7e3,
 VectorAssembler_6bec0a0073c1,
 StandardScaler_6381f7fb9fb9]

머신러닝을 위한 Preprocessing된 결과물 벡터를 하나로 합쳐야 훈련 가능한 데이터가 됩니다. `VectorAssembler`를 사용해서 합친다.

In [18]:
# Assemble 할 데이터는? OneHotEncoding이 되어 있거나, Scaled된 데이터를 Assemble
assemble_inputs = [c+"_onehot" for c in cat_features] + [n + "_scaled" for n in num_features]
assemble_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [19]:
total_assembler = VectorAssembler(inputCols=assemble_inputs, outputCol="features")
stages.append(total_assembler)
stages

[StringIndexer_f30a153ac1ea,
 OneHotEncoder_66d2738eb855,
 StringIndexer_906081025d1e,
 OneHotEncoder_dd3ba8cdd6e5,
 StringIndexer_5e918472a0b9,
 OneHotEncoder_b0c3e9cd4d7b,
 VectorAssembler_d2eab73faa90,
 StandardScaler_903a2da27230,
 VectorAssembler_af041fc15bd8,
 StandardScaler_b38efc26c7e3,
 VectorAssembler_6bec0a0073c1,
 StandardScaler_6381f7fb9fb9,
 VectorAssembler_d2eea2f075ab]