## Spark 세션 생성

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').appName('ml').getOrCreate()
spark

## 라이브러리 불러오기

In [5]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

print(np.__version__)

1.23.4


## 데이터 불러오기

In [6]:
flights = spark.read.csv("/home/human/wsl2_pyspark/data/flights.csv", 
                         sep=',', 
                         header=True, 
                         inferSchema=True, 
                         nullValue="NA")
flights.show(5)

+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows



## 머신러닝 주제
- 지연시간 예측
    + 종속변수 : delay
- 데이터의 행/열의 갯수
    

In [7]:
flights.count()

50000

In [8]:
len(flights.columns)

10

In [9]:
flights.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

## 데이터 전처리 & 피처엔지니어링
- flight 컬럼 삭제

In [10]:
flights2 = flights.drop("flight")
len(flights2.columns)

9

In [11]:
flights2.filter("delay IS NULL").count()

2978

- NA 결측치 삭제

In [12]:
flights3 = flights2.filter("delay IS NOT NULL")
flights3.count() # 50000 - 2978 = 47022

47022

In [13]:
flights4 = flights3.dropna() # dropna = 모든 NA 삭제
flights4.count()

47022

- NA 채우기

In [15]:
flights5 = flights2.fillna({"delay" : 0}) # fillna : 지정된 컬럼의 NA 값을 0으로 채움
flights5.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 11| 20|  6|     US|JFK|2153|  9.48|     351|    0|
|  0| 22|  2|     UA|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|ORD| 258|  8.92|      65|    0|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [16]:
flights5.count()

50000

## 파생변수 만들기
- withColumn() 활용해서 새로운 변수를 추가할 것
    - 참조 : https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

In [24]:
# mile 에서 km로 변경
# 1마일 = 1.60934 킬로미터
from pyspark.sql.functions import round

flights5 = flights5.withColumn("km", round(flights5.mile * 1.60934,0)).drop("mile")
flights5.show(5)

AttributeError: 'DataFrame' object has no attribute 'mile'

In [25]:
temp = flights5.withColumn("new_column",1)
temp.show(5)

TypeError: col should be Column

In [28]:
# delay 15 이상
flights5 = flights5.withColumn("label", (flights5.delay >=15).cast("integer"))
flights5.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
| 11| 20|  6|     US|JFK|  9.48|     351|    0|3465.0|    0|
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  4|  2|  5|     AA|ORD|  8.92|      65|    0| 415.0|    0|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



- 라벨인코딩
    + 참조 : https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.StringIndexer.html?highlight=stringindexer#pyspark.ml.feature.StringIndexer

In [30]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = "carrier", outputCol ="carrier_index")
indexer_model = indexer.fit(flights5)
flights6 = indexer_model.transform(flights5)
flights6.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_index|
+---+---+---+-------+---+------+--------+-----+------+-----+-------------+
| 11| 20|  6|     US|JFK|  9.48|     351|    0|3465.0|    0|          6.0|
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|          0.0|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|          0.0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|          1.0|
|  4|  2|  5|     AA|ORD|  8.92|      65|    0| 415.0|    0|          1.0|
+---+---+---+-------+---+------+--------+-----+------+-----+-------------+
only showing top 5 rows



In [32]:
indexer = StringIndexer(inputCol = "org", outputCol ="org_index")
indexer_model = indexer.fit(flights6)
flights7 = indexer_model.transform(flights6)
flights7.select("org","org_index").show(5)

+---+---------+
|org|org_index|
+---+---------+
|JFK|      2.0|
|ORD|      0.0|
|SFO|      1.0|
|ORD|      0.0|
|ORD|      0.0|
+---+---------+
only showing top 5 rows



- 원핫 인코딩

In [35]:
from pyspark.ml.feature import OneHotEncoder

onehot = OneHotEncoder(inputCols=["org_index"], outputCols=["org_dummy"])
onehot = onehot.fit(flights7)
flights8 = onehot.transform(flights7)
flights8.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+-------------+---------+-------------+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|carrier_index|org_index|    org_dummy|
+---+---+---+-------+---+------+--------+-----+------+-----+-------------+---------+-------------+
| 11| 20|  6|     US|JFK|  9.48|     351|    0|3465.0|    0|          6.0|      2.0|(7,[2],[1.0])|
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|          0.0|      0.0|(7,[0],[1.0])|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|          0.0|      1.0|(7,[1],[1.0])|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|          1.0|      0.0|(7,[0],[1.0])|
|  4|  2|  5|     AA|ORD|  8.92|      65|    0| 415.0|    0|          1.0|      0.0|(7,[0],[1.0])|
+---+---+---+-------+---+------+--------+-----+------+-----+-------------+---------+-------------+
only showing top 5 rows



In [38]:
flights8.select("org", "org_index","org_dummy").distinct().sort("org_index").show()

+---+---------+-------------+
|org|org_index|    org_dummy|
+---+---------+-------------+
|ORD|      0.0|(7,[0],[1.0])|
|SFO|      1.0|(7,[1],[1.0])|
|JFK|      2.0|(7,[2],[1.0])|
|LGA|      3.0|(7,[3],[1.0])|
|SJC|      4.0|(7,[4],[1.0])|
|SMF|      5.0|(7,[5],[1.0])|
|TUS|      6.0|(7,[6],[1.0])|
|OGG|      7.0|    (7,[],[])|
+---+---------+-------------+



## 벡터화
- DataFrame에서 바로 ML 인풋 데이터로 들어가는게 아님

In [39]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = [
    "mon", "dom","dow","carrier_index","org_index","km","depart","duration"
], outputCol="features")

flights9 = assembler.transform(flights8)

In [41]:
flights9.select("features","delay").show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[11.0,20.0,6.0,6.0,2.0,3465.0,9.48,351.0]|0    |
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|-5   |
|[4.0,2.0,5.0,1.0,0.0,415.0,8.92,65.0]    |0    |
+-----------------------------------------+-----+
only showing top 5 rows



## 데이터셋 분리

In [42]:
flights_train, flights_test = flights9.randomSplit([0.8,0.2],seed=42)

training_ratio = flights_train.count() / flights.count()
print(training_ratio)

[Stage 47:>                                                         (0 + 1) / 1]

0.79896


                                                                                

## 모형 만들기

In [44]:
from pyspark.ml.regression import LinearRegression
lr_model = LinearRegression(labelCol = "delay").fit(flights_train)

predictions = lr_model.transform(flights_test)
predictions.select("delay","prediction").show(5)

22/11/09 10:31:17 WARN Instrumentation: [005b4919] regParam is zero, which might cause numerical instability and overfitting.


+-----+------------------+
|delay|        prediction|
+-----+------------------+
|   11|34.930302444682454|
|   -9|26.059410033714094|
|   31| 42.76433603350965|
|   39| 35.34143209866805|
|   68|34.906640116747695|
+-----+------------------+
only showing top 5 rows



In [45]:
# RMSE
from pyspark.ml.evaluation import RegressionEvaluator
RegressionEvaluator(labelCol="delay").evaluate(predictions)

53.532150733253744

In [46]:
lr_model.intercept

4.508734319896046

In [47]:
lr_model.coefficients

DenseVector([-0.9057, -0.0164, -0.1998, -0.9834, -2.2976, -0.0077, 1.8449, 0.1211])

## 시각화
- pandas로 변환 / seaborn & matplotlib