In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

directory="/home/ubuntu/working/spark-examples/data"
trip_files="trips/*"

trips_df = spark.read.csv(f"file://{directory}/{trip_files}", inferSchema=True, header=True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/14 01:01:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [2]:
trips_df.createOrReplaceTempView("trips")

달라진 점
 - 첫 번째 모델은 너무 데이터의 종류가 적어서 예측 성능이 좋지 못했기 때문에 데이터의 종류를 좀 늘려볼 예정!

In [4]:
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 5
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_df.createOrReplaceTempView("data")

In [5]:
data_df.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
|              1|               138|                141|          9.3|          0|     Monday|       43.67|
|              1|           

In [6]:
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=42)

지금처럼 데이터의 양이 너무나 많고, 그 데이터에 대한 전처리를 수행 했음. 그래서 시간이 굉장히 많이 걸렸다고 가정
- 사용할 데이터 마트의 구축이 완료된 상황이라고 가정
- 추후에 다시 이 데이터를 다시 활용한다면 다시 처음부터 전처리하는데 시간이 많이 걸린다..
- 이렇게 전처리가 된 데이터를 파일이나 데이터베이스에 저장 해놓고, 나중에 다시 불러오는게 훨씬 시간적으로 이득

In [7]:
# 파케이(parquet) 형식으로 데이터 마트를 저장
data_dir = "/home/ubuntu/working/spark-examples/data/ml-data"

# Spark Dataframe의 write를 이용해 데이터를 파일 또는 데이터베이스에 저장이 가능
train_df.write.format("parquet").save(f"{data_dir}/train/")
test_df.write.format("parquet").save(f"{data_dir}/test/")

                                                                                

In [19]:
# 분산 저장 되어서 파티션으로 분리된 파일을 불러오기
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()
data_dir = "/home/ubuntu/working/spark-examples/data/ml-data"

train_df = spark.read.parquet(f"{data_dir}/train/")
test_df = spark.read.parquet(f"{data_dir}/test/")

In [20]:
train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)



# 파이프라인 구성

In [21]:
# Pipeline에 넣을 과정(stage)을 하나씩 넣어 놓을 리스트
stages = []

## 1. OneHotEncoding Stage
- `pickup_location_id`
- `dropoff_location_id`
- `day_of_week`

`pickup_location_id`, `dropoff_location_id`는 숫자 형식의 데이터!
- 숫자 형태의 데이터는 OneHotEncoding이 안된다.
- 숫자 형식의 카테고리 데이터를 임시로 문자열로 처리하기 위해 `StringIndexer` 트랜스포머를 활용

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# OneHotEncoding을 수행할 컬럼을 지정
cat_features = [
    "pickup_location_id",
    "dropoff_location_id",
    "day_of_week"
]

for c in cat_features:
    # 1. 데이터를 문자열 형식으로 바꿔준다. setHandleInvalid : Null값 같은 데이터를 어떻게 처리 할건지
    cat_indexer = StringIndexer(inputCol=c, outputCol=c+"_idx").setHandleInvalid("keep")
    
    # 2. One Hot Encoding 수행
    onehot_encoder = OneHotEncoder(inputCols=[cat_indexer.getOutputCol()], outputCols=[c+"_onehot"])
    
    stages += [cat_indexer, onehot_encoder]

stages

[StringIndexer_6a5e2395d543,
 OneHotEncoder_831e5028990e,
 StringIndexer_cc289a352393,
 OneHotEncoder_e69185437cd6,
 StringIndexer_b1054a91b76a,
 OneHotEncoder_964f74c9b6f1]

## 2. StandardScaler & VectorAssembler Stage
- `passenger_count`
- `trip_distance`
- `pickup_time`

In [23]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

num_features = [
    "passenger_count",
    "trip_distance",
    "pickup_time"
]

for n in num_features:
    
    # 각각의 컬럼의 데이터가 벡터화. ex) 1.5 -> [1.5]
    num_assembler = VectorAssembler(inputCols=[n], outputCol=n+"_vector")
    
    # StandardScaling 수행
    num_scaler = StandardScaler(inputCol=num_assembler.getOutputCol(), outputCol=n+"_scaled")
    
    stages += [num_assembler, num_scaler]

stages

[StringIndexer_6a5e2395d543,
 OneHotEncoder_831e5028990e,
 StringIndexer_cc289a352393,
 OneHotEncoder_e69185437cd6,
 StringIndexer_b1054a91b76a,
 OneHotEncoder_964f74c9b6f1,
 VectorAssembler_eba5f07be7e9,
 StandardScaler_ce644ce26efa,
 VectorAssembler_bd6747be5d25,
 StandardScaler_520955cb966b,
 VectorAssembler_fd46d0e2b56a,
 StandardScaler_0b84442dc758]

머신러닝을 위한 Preprocessing된 결과물 벡터를 하나로 합쳐야 훈련 가능한 데이터가 된다. - `VectorAssembler`를 사용해서 합친다.

In [24]:
# Assemble 할 데이터는?
# OntHotEncoding이 되어 있거나, Scaled된 데이터를 합쳐줘야 한다.

assembler_inputs = [c + "_onehot" for c in cat_features] + [n + "_scaled" for n in num_features]
assembler_inputs

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [25]:
total_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(total_assembler)
stages

[StringIndexer_6a5e2395d543,
 OneHotEncoder_831e5028990e,
 StringIndexer_cc289a352393,
 OneHotEncoder_e69185437cd6,
 StringIndexer_b1054a91b76a,
 OneHotEncoder_964f74c9b6f1,
 VectorAssembler_eba5f07be7e9,
 StandardScaler_ce644ce26efa,
 VectorAssembler_bd6747be5d25,
 StandardScaler_520955cb966b,
 VectorAssembler_fd46d0e2b56a,
 StandardScaler_0b84442dc758,
 VectorAssembler_9496fbcda261]

In [26]:
# 파이프라인 등록
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=stages)
pipeline

Pipeline_02c52eb987fc

In [27]:
# fit
fitted_transformer = pipeline.fit(train_df)
fitted_transformer

                                                                                

PipelineModel_cf93762ed89e

transformer 파이프라인을 이용해 tran_df 데이터 변환 확인

In [29]:
vec_train_df = fitted_transformer.transform(train_df)
vec_train_df.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- features: vector (nullable 

In [15]:
# vec_train_df.select("features").show(5)

[Stage 21:>                 (0 + 2) / 2][Stage 22:>                 (0 + 0) / 1]

+--------------------+
|            features|
+--------------------+
|(534,[62,312,530,...|
|(534,[62,312,528,...|
|(534,[62,273,528,...|
|(534,[62,281,525,...|
|(534,[62,309,524,...|
+--------------------+
only showing top 5 rows



Exception in thread "refresh progress" java.lang.OutOfMemoryError: Java heap space
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.status.AppStatusStore.activeStages(AppStatusStore.scala:114)
	at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:64)
	at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:52)
	at java.base/java.util.TimerThread.mainLoop(Timer.java:556)
	at java.base/java.util.TimerThread.run(Timer.java:506)
Exception in thread "RemoteBlock-temp-file-clean-thread" java.lang.OutOfMemoryError: Java heap space
23/06/14 02:31:43 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
23/06/14 02:31:43 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space


# 모델 생성

In [30]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    maxIter=50,
    solver='normal',
    labelCol="total_amount",
    featuresCol="features",
    regParam=0.1
)

In [31]:
model = lr.fit(vec_train_df)

23/06/14 02:34:05 WARN Instrumentation: [0e1a4630] regParam is zero, which might cause numerical instability and overfitting.
23/06/14 02:35:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/06/14 02:35:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/06/14 02:35:29 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
23/06/14 02:35:30 WARN Instrumentation: [0e1a4630] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
23/06/14 02:35:30 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/06/14 02:35:30 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

# 테스트 데이터 예측

In [32]:
# test_df 변환
vec_test_df = fitted_transformer.transform(test_df)

In [33]:
# vec_test_df로 예측
predictions = model.transform(vec_test_df)
predictions.show()

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+-----------------------+--------------------------+---------------+------------------+----------------------+----------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|pickup_location_id_idx|pickup_location_id_onehot|dropoff_location_id_idx|dropoff_location_id_onehot|day_of_week_idx|day_of_week_onehot|passenger_count_vector|passenger_count_scaled|trip_distance_vector|trip_distance_scaled|pickup_time_vector|  pickup_time_scaled|            features|        prediction|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+----------------------+-------------------------+----------------------

In [34]:
# 예측한 결과를 따로 확인 할 때는 조회만 일어난다. predictions 데이터도 cache() 처리하는 것이 좋음!
predictions.cache()

DataFrame[passenger_count: int, pickup_location_id: int, dropoff_location_id: int, trip_distance: double, pickup_time: int, day_of_week: string, total_amount: double, pickup_location_id_idx: double, pickup_location_id_onehot: vector, dropoff_location_id_idx: double, dropoff_location_id_onehot: vector, day_of_week_idx: double, day_of_week_onehot: vector, passenger_count_vector: vector, passenger_count_scaled: vector, trip_distance_vector: vector, trip_distance_scaled: vector, pickup_time_vector: vector, pickup_time_scaled: vector, features: vector, prediction: double]

In [35]:
predictions.select("trip_distance", "day_of_week", "total_amount", "prediction").show()

23/06/14 02:38:12 WARN MemoryStore: Not enough space to cache rdd_83_0 in memory! (computed 440.4 MiB so far)
23/06/14 02:38:12 WARN BlockManager: Persisting block rdd_83_0 to disk instead.


+-------------+-----------+------------+------------------+
|trip_distance|day_of_week|total_amount|        prediction|
+-------------+-----------+------------+------------------+
|          1.0|   Thursday|       12.25|13.251716595308377|
|          2.2|   Saturday|        14.3|15.914445362429937|
|          1.0|     Monday|         7.3|10.278181010473554|
|          5.2|     Friday|        25.3| 22.23538812288367|
|          1.9|     Monday|        11.3|11.643369460731456|
|          4.5|   Thursday|        17.8|28.273347378101008|
|          0.4|   Thursday|         5.8| 7.408246384979956|
|         15.4|   Thursday|        65.3| 45.73283006976534|
|         15.6|   Thursday|       63.85| 68.54619282108459|
|         18.9|   Thursday|        63.3| 77.10267278956672|
|         13.6|   Saturday|       61.85|62.643116917470884|
|         15.2|  Wednesday|        76.3| 68.12184233120922|
|          3.3|     Friday|        15.8| 19.71758435607552|
|          3.7|     Monday|        16.8|

23/06/14 02:38:29 WARN MemoryStore: Not enough space to cache rdd_83_0 in memory! (computed 440.4 MiB so far)
                                                                                

In [37]:
model.summary.rootMeanSquaredError

5.89274566627785

In [38]:
model.summary.r2

0.7957936214535244

In [39]:
spark.stop()