In [1]:
%python

# 데이터 클랜징 == 데이터 전처리
# 데이터를 가공하는 일련의 과정결측값 처리, 이상값 처리, Feature Engineering


In [2]:
# 사용하지 않을 column들을 삭제합니다
# DepTime, CRSDepTime, ArrTime, CRSArrTime, AirTime, ArrDelay, DepDelay, TaxiIn, TaxiOut, CancellationCode, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay

# null 값이 NA라는 문자열로 표현되어 있으므로, 이 값들을 null으로 바꾸도록 하겠습니다
# 이렇게 해야 나중에 분석 query가 단순 깔끔하게 나옵니다

# 몇몇 column의 type을 바꾸도록 하겠습니다. 구체적으로는
# 실제 경과 시간(ActualElapsedTime), 경과 예정 시간(CRSElapsedTime), 비행 시간(AirTime), 거리(Distance) 항목은 의미상 Integer 형이지만, String 형으로 표시되어 있으므로 type을 바꾸도록 하겠습니다. 이렇게 해야 나중에 평균이나 중간값 등의 통계 함수를 쓸 수 있습니다
# 취소 여부(Cancelled), 선회 여부(Diverted) 항목은 의미상 Boolean 형이지만 Integer 형으로 표시되어 있으므로 역시 type을 바꾸도록 하겠습니다


In [3]:
# udf == UserDefineFunction 
# 사용할 Spark SQL 함수를 사용자가 정의


from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType, IntegerType, StringType


In [4]:
# NA값을 null로 변경하기

def cleanString(value):
  if (value == None) or (value == 'NA'):
    return None
  else:
    value


In [5]:
# string으로 설정된 숫자를 Integer로 변경

def stringToInteger(value):
  if value == 'NA':
    return None
  else:
    return int(value)


In [6]:
# Integer값을 Boolean으로 변경

def integerToBoolean(value):
  return False if value == 0 else True


In [7]:
# 위의 3개 함수 udf로 정의

cleanStrFunction = udf(stringToInteger, IntegerType())

cleanIntegerFunction = udf(integerToBoolean, BooleanType())

cleanStringFunction = udf(cleanString, StringType())


In [8]:
from pyspark.sql import SparkSession
# 세션 생성
spark = SparkSession \
  .builder \
  .appName("Spark EDA example on us aircraft data ") \
  .getOrCreate()

#마운트 되어 저장된 csv데이터 읽어서 데이터 프레임 객체에 저장
raw_df = spark.read.csv("/mnt/us-carrier-dataset", header = 'True', inferSchema='True')


In [9]:
# 사용 안하는 컬럼 삭제후 UDF적용

us_carrier_df = raw_df \
    .drop(
      # 사용하지 않을 column 삭제
      'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'AirTime', 'ArrDelay', 'DepDelay', 'TaxiIn', 'TaxiOut', 'CancellationCode', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay') \
    .withColumn(
      # 'NA' 항목 null로 변경하고 Integer 형으로 변경
      'ActualElapsedTime', cleanStrFunction('ActualElapsedTime')) \
    .withColumn(
      # 'NA' 항목 null로 변경하고 Integer 형으로 변경
      'CRSElapsedTime', cleanStrFunction('CRSElapsedTime')) \
    .withColumn(
      # 'NA' 항목 null로 변경하고 Integer 형으로 변경
      'Distance', cleanStrFunction('Distance')) \
    .withColumn(
      # 'NA' 항목 null로 변경하고 Boolean 형으로 변경
      'Cancelled', cleanIntegerFunction('Cancelled')) \
    .withColumn(
      # 'NA' 항목 null로 변경하고 Boolean 형으로 변경
      'Diverted', cleanIntegerFunction('Diverted'))

us_carrier_df.printSchema()
us_carrier_df.show()


In [10]:
# 추후 자주 사용할 자료이므로 클러스터 메모리 상에 남겨 두고 삭제하지 않도록 합니다.
us_carrier_df.cache()

# 추후 SQL문 형태로 질의할 때 사용하기 위해 클렌징이 끝난 Dataframe을 전역 임시 뷰로 지정합니다.
us_carrier_df.createGlobalTempView("us_carrier")
