<a href="https://colab.research.google.com/github/hyelin606/spark/blob/main/ch02_ML_240416.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Google Drive 연동

In [36]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Spark 설치

In [37]:
!apt-get install openjdk-8-jdk-headless
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -zxf spark-3.5.1-bin-hadoop3.tgz

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-8-jdk-headless is already the newest version (8u402-ga-2ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [38]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

- 설치버전과 동일하게 pyspark 버전 설치해야 함

In [39]:
!pip install findspark -q

In [40]:
import findspark
findspark.init()

In [41]:
import pyspark
spark_version = pyspark.__version__
print("Apache Spark 버전 확인: " + spark_version)

Apache Spark 버전 확인: 3.5.1


# Spark 세션 설정

In [42]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').appName("ML").getOrCreate()
spark

# 데이터 불러오기

In [43]:
DATA_PATH = "/content/drive/MyDrive/Colab Notebooks/멀티캠퍼스34/스파크/data/"
flights = spark.read.option('header', 'true').csv(DATA_PATH + "flight_small.csv")
flights.show(1)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 1 row



In [44]:
planes = spark.read.option('header', 'true').csv(DATA_PATH + "planes.csv")
planes.show(1)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 1 row



## 데이터 가공

### 1단계
- planes의 year => plane_year로 바꾸기
- 2개의 테이블을 join
  + flights 기준으로 left join

In [45]:
# Rename
planes = planes.withColumnRenamed("year", "plane_year")

# 2개의 테이블 Join
model_data = flights.join(planes, on = 'tailnum', how = 'leftouter')
model_data.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+---

- arr_delay, air_time, month, plane_year => integer로 변경

In [46]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [47]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: string (nullable = true)
 |-- seats: string (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



- 새로운 컬럼 추가
  + plane_age 라는 컬럼 생성

In [48]:
model_data = model_data.withColumn('plane_age', model_data.year - model_data.plane_year)
model_data.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------

- 컬럼명 is_late : 도착시간(arr_delay)을 기준으로 해서 양수가 나오면 늦음 => True/False로
- withColumn() 내부에서 연산


In [49]:
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
model_data.show(10)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL|     360|    2677|  10|

In [50]:
# label column 생성 from is_late ==> integer
# 타겟변수
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
model_data.show(3)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX|     132|     954|   6|    58|      2011|Fixed wing multi ...|      AIRBUS|A320-214|      2|  182|   NA|Turbo-fan|      3.0|  false|    0|
| N559AS|2014|    1| 22|    1040|        5|    1505|        5|     AS|   851|   SEA| HNL

In [51]:
# 결측치가 있는 컬럼 제거
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

# Feature Engineering
- PIpeline 구축
  + VectorAssembler
  + StringIndexer
  + OneHotEncoder
- 모든 input data는 숫자여야 해서, 모두 변경해야 함
- Spark ML 클래스가 요구하는 데이터 양식에 맞춰야 함

In [52]:
model_data.select('carrier').show(1)

# carrier => carrier_index (stringIndexer를 활용히여) => carrier_fact (OneHotEncoder를 활용하여)

+-------+
|carrier|
+-------+
|     VX|
+-------+
only showing top 1 row



In [53]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])
piped_data = flights_pipe.fit(model_data).transform(model_data)
piped_data.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+--------------+-------------+--------------+--------------------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|     dest_fact|carrier_index|  carrier_fact|            features|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+--------------+-------------+--------------+--------------------+
| N846VA|2014|   12|  8|     658|       -7|     935|       -5|     VX|  1780|   SEA| LAX

In [54]:
piped_data.select('carrier', 'carrier_index', 'carrier_fact').show(20)

+-------+-------------+--------------+
|carrier|carrier_index|  carrier_fact|
+-------+-------------+--------------+
|     VX|          7.0|(10,[7],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     VX|          7.0|(10,[7],[1.0])|
|     WN|          1.0|(10,[1],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     WN|          1.0|(10,[1],[1.0])|
|     WN|          1.0|(10,[1],[1.0])|
|     VX|          7.0|(10,[7],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     UA|          4.0|(10,[4],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     WN|          1.0|(10,[1],[1.0])|
|     AS|          0.0|(10,[0],[1.0])|
|     OO|          2.0|(10,[2],[1.0])|
+-------+-------------+--------------+
only showing top 20 rows



## 데이터 분리

In [55]:
training, test = piped_data.randomSplit([0.6, 0.4])

## 모델 생성

In [56]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()

## 교차검증
- Grid Search

In [57]:
import pyspark.ml.tuning as tune
import numpy as np

grid = tune.ParamGridBuilder()

# 파라미터 튜닝 인수 추가
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

grid = grid.build()

- 교차검증: 평가지표 필요

In [58]:
import pyspark.ml.evaluation as evals
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

- 교차검증 객체 만들기

In [59]:
cv = tune.CrossValidator(estimator = lr,
                         estimatorParamMaps = grid,
                         evaluator = evaluator)

cv

CrossValidator_4261e8b4f5ba

In [60]:
final_lr = cv.fit(training)

In [62]:
test_results = final_lr.transform(test)
test_results.show(1)

+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+--------------+--------------------+--------------------+--------------------+----------+
|tailnum|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|dest|air_time|distance|hour|minute|plane_year|                type|    manufacturer|   model|engines|seats|speed|   engine|plane_age|is_late|label|dest_index|      dest_fact|carrier_index|  carrier_fact|            features|       rawPrediction|         probability|prediction|
+-------+----+-----+---+--------+---------+--------+---------+-------+------+------+----+--------+--------+----+------+----------+--------------------+----------------+--------+-------+-----+-----+---------+---------+-------+-----+----------+---------------+-------------+----

In [63]:
evaluator.evaluate(test_results)

0.6927044743361577