# CH 02
## 데이터 준비

스파크(Spark)를 이용한 데이터 분석 예제로, 본 챕터에서 사용하는 데이터는 UC Irvine의 머신러닝 데이터 저장소에서 구할 수 있는 표본 데이터중의 하나로, 2010년에 독일의 한 병원에서 실시한 레코드 링크 연구에서 나온 것이라고 합니다. 도서에서 제공하는 `bit.ly` 주소를 이용하여 압축된 파일을 다운받을 수 있습니다. 필요한 csv 파일들을 압축 해제할 수 있도록 아래 명령어를 실행합니다.

In [1]:
!mkdir linkage
!curl -L -o "donation.zip" http://bit.ly/1Aoywaq
!unzip -o "donation.zip"
!unzip -o "block_\*.zip" -d linkage

mkdir: cannot create directory ‘linkage’: File exists

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current


                                 Dload  Upload   Total   Spent    Left  Speed




  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

100   163  100   163    0     0    541      0 --:--:-- --:--:-- --:--:--   539

100   163  100   163    0     0    541      0 --:--:-- --:--:-- --:--:--   539




  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0

  0 53.8M    0  387k    0     0   172k      0  0:05:19  0:00:02  0:05:17  399k

 13 53.8M   13 7490k    0     0  2299k      0  0:00:23  0:00:03  0:00:20 3784k

 30 53.8M   30 16.5M    0     0  3941k      0  0:00:13  0:00:04  0:00:09 5613k

 41 53.8M   41 22.3M    0     0  4351k      0  0:00:12  0:00:05  0:00:07 5755k

 68 53.8M   68 37.0M    0     0  6077k      0  0:00:09  0:00:06  0:00:03 7643k
100 53.8M  100 53.8M    0     0  7362k      0  0:00:07  0:0

## 스파크 기초

스파크에서 파일을 읽는 가장 간단한 방법은 텍스트로 불러오는 방법입니다. 하둡(Hadoop)을 이용하게 되면 스파크는 기본적으로 hdfs에서 파일을 찾게 되지만, 지금 사용하고 있는 Docker 이미지에서는 하둡을 이용하지 않기 때문에, 주피터(Jupyter)가 실행되는 경로를 기준으로 하여 상대 경로를 이용할 수 있습니다. `linkage`를 경로로 지정함으로써 폴더 내의 모든 파일을 읽어들이게 됩니다.

In [2]:
val df = spark.read.text("linkage")

Intitializing Scala interpreter ...

Spark Web UI available at http://a7c3cca44251:4040
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1597645295748)
SparkSession available as 'spark'


df: org.apache.spark.sql.DataFrame = [value: string]


데이터프레임(DataFrame)에서 데이터를 꺼내보기 위한 방법은 여러 가지가 있습니다. 그 중에서 `first` 메소드는 데이터프레임의 가장 첫 번째 레코드를 가져오는 방법입니다. 실제로 하나의 레코드를 읽었는지 확인하기 위해서 `length` 메소드를 이용해보면 결과값이 1로 나타나는 것을 확인할 수 있습니다.

In [3]:
val first = df.first
first.length

first: org.apache.spark.sql.Row = ["id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"]
res0: Int = 1


그와 유사하게, `take` 메소드는 숫자를 지정하여 원하는 갯수 만큼의 레코드를 불러올 수 있습니다. `head` 또한 마찬가지 입니다.

In [4]:
val take = df.take(10)
take.length

take: Array[org.apache.spark.sql.Row] = Array(["id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"], [3148,8326,1,?,1,?,1,1,1,1,1,TRUE], [14055,94934,1,?,1,?,1,1,1,1,1,TRUE], [33948,34740,1,?,1,?,1,1,1,1,1,TRUE], [946,71870,1,?,1,?,1,1,1,1,1,TRUE], [64880,71676,1,?,1,?,1,1,1,1,1,TRUE], [25739,45991,1,?,1,?,1,1,1,1,1,TRUE], [62415,93584,1,?,1,?,1,1,1,1,0,TRUE], [27995,31399,1,?,1,?,1,1,1,1,1,TRUE], [4909,12238,1,?,1,?,1,1,1,1,1,TRUE])
res1: Int = 10


데이터프레임을 조회했을 때, 위의 결과처럼 행(Row)들의 `Array`가 출력되는데, 이 때 가독성이 너무 떨어지기 때문에 아래와 같이 `foreach` 문을 이용하여 가독성을 향상시킬 수 있습니다.

In [5]:
take.foreach(println)

["id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"]
[3148,8326,1,?,1,?,1,1,1,1,1,TRUE]
[14055,94934,1,?,1,?,1,1,1,1,1,TRUE]
[33948,34740,1,?,1,?,1,1,1,1,1,TRUE]
[946,71870,1,?,1,?,1,1,1,1,1,TRUE]
[64880,71676,1,?,1,?,1,1,1,1,1,TRUE]
[25739,45991,1,?,1,?,1,1,1,1,1,TRUE]
[62415,93584,1,?,1,?,1,1,1,1,0,TRUE]
[27995,31399,1,?,1,?,1,1,1,1,1,TRUE]
[4909,12238,1,?,1,?,1,1,1,1,1,TRUE]


첫 번째 행은 다른 행과 차이가 있는데, 각 열(Column)에 대한 헤더를 포함하고 있기 때문입니다. 그래서 데이터 분석을 할 때 이러한 헤더가 섞이면 곤란하므로 헤더를 구분해주어야 할 필요가 있습니다. 간단한 사용자 정의 함수(User Defined Function, UDF)를 이용하여 헤더를 구분해볼 수 있습니다.

In [6]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf

val isHeader: Row => Boolean = _.toString.contains("id_1")
val _isHeader = udf(isHeader)

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
isHeader: org.apache.spark.sql.Row => Boolean = $Lambda$3144/0x00000008412a0040@4adcd363
_isHeader: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$3144/0x00000008412a0040@4adcd363,BooleanType,List(None),None,false,true)


`take(10)`의 결과물에서 헤더인 행만 필터해보면 1개의 행만 출력됨을 확인할 수 있습니다.

In [7]:
take.filter(isHeader).foreach(println)
take.filter(isHeader(_)).length

["id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"]


res3: Int = 1


그 반대로 필터를 해보면 9개의 행이 출력되는 것을 확인할 수 있습니다.

In [8]:
take.filterNot(isHeader).foreach(println)
take.filter(!isHeader(_)).length

[3148,8326,1,?,1,?,1,1,1,1,1,TRUE]
[14055,94934,1,?,1,?,1,1,1,1,1,TRUE]
[33948,34740,1,?,1,?,1,1,1,1,1,TRUE]
[946,71870,1,?,1,?,1,1,1,1,1,TRUE]
[64880,71676,1,?,1,?,1,1,1,1,1,TRUE]
[25739,45991,1,?,1,?,1,1,1,1,1,TRUE]
[62415,93584,1,?,1,?,1,1,1,1,0,TRUE]
[27995,31399,1,?,1,?,1,1,1,1,1,TRUE]
[4909,12238,1,?,1,?,1,1,1,1,1,TRUE]


res4: Int = 9


## 스파크 데이터프레임

스파크에서는 여러 데이터 포맷에 대해서 손쉽게 데이터를 읽어서 데이터프레임으로 만들 수 있는 API를 제공합니다. 앞에서는 CSV 파일의 행 하나하나를 단순히 Text로 읽었지만, 이번에는 CSV의 목적에 맞게 comma로 구분되는 열 구조를 가진 데이터프레임으로 읽어보겠습니다. 아래와 같이 `.csv` 메소드를 이용해서 CSV 파일을 읽습니다.

In [9]:
val parsed = spark.read.csv("linkage")

parsed: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 10 more fields]


그리고 `.show` 메소드를 사용해서 데이터프레임을 살펴보면 열 구조가 제대로 나뉜 것을 확인할 수 있습니다.

In [10]:
parsed.show

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
|  _c0|  _c1|         _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
| 3148| 8326|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|14055|94934|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|33948|34740|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|  946|71870|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|64880|71676|           1|           ?|           1|           ?|      1|     1|     1|     1|      1|  

딱 한 가지 문제가 있다면, CSV의 헤더가 하나의 행으로 인식되고 있다는 점입니다. 물론, 이것을 해결할 방법이 있습니다. `.csv` 메소드는 다양한 옵션을 설정할 수 있습니다. 헤더의 존재 유무나, 특정 값을 `null`로 변환하거나, 또는 스파크의 강력한 기능 중에 하나인 스키마 추론을 이용해서 해당 컬럼에 들어있는 값들의 데이터 타입을 유추해볼 수 있습니다.

In [11]:
val parsed = spark.read
    .option("header", "true")
    .option("nullvalue", "?")
    .option("inferSchema", "true")
    .csv("linkage")

parsed: org.apache.spark.sql.DataFrame = [id_1: int, id_2: int ... 10 more fields]


이처럼 데이터프레임을 읽어올 때, 옵션을 지정하면 아래와 같이 헤더가 컬럼 이름으로 들어가고 `?` 값이 `null`로 바뀌면서 각 열에 대해서 추론한 데이터 타입을 가지는 것을 확인할 수 있습니다. 추론을 하지 않으면 기본적으로 모든 컬럼은 `string` 타입으로 인식됩니다.

In [12]:
parsed.show
parsed.printSchema

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|  

## 데이터 분석

이제 읽어들인 데이터를 분석해볼 수 있습니다. 가장 기초적인 방법은 데이터의 양을 확인하는 것입니다. 데이터프레임이 가지고 있는 행의 카운트를 다음과 같이 수행할 수 있습니다.

In [13]:
parsed.count

res7: Long = 5749132


데이터프레임을 반복적으로 읽어서 사용하게 된다면, 파일을 마찬가지로 반복적으로 읽어오면서 속도 저하가 발생할 수 있습니다. 앞으로 해당 데이터프레임을 빠르게 사용하기 위해서 약간의 시간을 들여서 미리 메모리에 캐쉬를 하겠습니다. 캐쉬 위치는 메모리와 디스크 혹은 둘 다 사용하도록 설정을 할 수 있지만, 기본 값은 메모리만 사용하는 것입니다.

In [14]:
parsed.cache

res8: parsed.type = [id_1: int, id_2: int ... 10 more fields]


데이터에서 `id_1`과 `id_2`가 같은 사람인지를 확인한 `is_match` 값에 대해서 각각 얼마만큼의 데이터를 가지고 있는지 확인해볼 수 있습니다.

In [15]:
parsed.groupBy('is_match).count.orderBy('count.desc).show

+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



또는, 간단한 통계 함수를 이용하여 특정 열에서의 평균값 또는 표준편차를 구할 수도 있습니다. 

In [16]:
parsed.agg(avg('cmp_sex), stddev('cmp_sex)).show

+-----------------+--------------------+
|     avg(cmp_sex)|stddev_samp(cmp_sex)|
+-----------------+--------------------+
|0.955001381078048| 0.20730111116897781|
+-----------------+--------------------+



이러한 데이터 분석 값을 바탕으로 주어진 데이터를 어떻게 모델링해야 보다 정확한 `is_match` 값을 얻을 수 있는지 고민해봐야 합니다. 본 챕터의 목적은 `is_match` 열을 모른 상태에서 주어진 다른 열의 데이터를 이용하여 실제 `is_match` 값을 추측하는 것입니다. 그렇기 때문에 각 열의 정보를 어떻게 조합할 것인지에 대한 모델링이 중요하다는 것입니다.

스파크에서는 데이터프레임 API 뿐만 아니라 기존에 개발자들이 익숙하게 사용해왔던 sql API를 함께 제공합니다. 스파크 sql을 이용하면 손쉽게 sql 문을 이용하여 데이터프레임을 조작할 수 있습니다. 먼저 데이터프레임을 하나의 테이블로 인식시킵니다.

In [17]:
parsed.createOrReplaceTempView("linkage")

그리고 sql문을 이용하여 앞서 데이터프레임을 이용해서 살펴본 데이터 분석 과정을 동일하게 수행할 수 있습니다.

In [18]:
spark.sql("""
    SELECT is_match, COUNT(*) cnt
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC
""").show

+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



지금까지는 사용자가 직접 데이터를 분석하는 과정이었습니다. 그러나, 스파크는 모든 데이터프레임에 대해서 간단한 통계 정보를 이미 보유하고 있습니다. 간단하게 원하는 데이터프레임에 대해서 `.describe()` 메소드를 수행해보면 그 결과도 마찬가지로 데이터프레임으로 주어짐을 확인할 수 있습니다.

In [19]:
val summary = parsed.describe()

summary: org.apache.spark.sql.DataFrame = [summary: string, id_1: string ... 10 more fields]


데이터프레임의 서머리를 살펴보면 이처럼 count, mean, stddev, min, max 값들을 각 열에 대해서 보여줍니다.

In [20]:
summary.show

+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|summary|              id_1|              id_2|       cmp_fname_c1|      cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|            cmp_by|            cmp_plz|
+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|  count|           5749132|           5749132|            5748125|            103698|           5749132|               2464|            5749132|            5748337|            5748337|           5748337|            5736289|
|   mean| 33324.48559643438| 66587.43558331935| 0.7129024704437266|0.9000176718903189|0.315627819308

필요한 열만 선택해서 보면 아래와 같이 나타납니다.

In [21]:
summary.select('summary, 'cmp_fname_c1, 'cmp_fname_c2).show

+-------+-------------------+------------------+
|summary|       cmp_fname_c1|      cmp_fname_c2|
+-------+-------------------+------------------+
|  count|            5748125|            103698|
|   mean| 0.7129024704437266|0.9000176718903189|
| stddev|0.38875835961628014|0.2713176105782334|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



혹은 `is_match` 값이 `true`인 경우만 필터링을 해서 데이터프레임을 새로 만들고, 그에 대한 통계 값들을 다시 구해서 볼 수도 있습니다. 분석 모델링을 위해서 이처럼 다양한 값에 대해서 필터링한 후의 통계 값을 보는 것이 도움이 될 것입니다. 다만... 열이 많아서 데이터프레임을 쉽게 보기가 힘들군요.

In [22]:
val matches = parsed.where('is_match === true)
val matchSummary = matches.describe()

val misses = parsed.where('is_match === false)
val missSummary = misses.describe()

matches: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_1: int, id_2: int ... 10 more fields]
matchSummary: org.apache.spark.sql.DataFrame = [summary: string, id_1: string ... 10 more fields]
misses: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id_1: int, id_2: int ... 10 more fields]
missSummary: org.apache.spark.sql.DataFrame = [summary: string, id_1: string ... 10 more fields]


## Pivot과 형태 변환

앞서 데이터프레임의 통계 데이터프레임인 `summary` 데이터프레임의 스키마는 다음과 같이 모두 `string` 입니다.

In [23]:
summary.printSchema

root
 |-- summary: string (nullable = true)
 |-- id_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: string (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: string (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)



하지만 모든 컬럼이 숫자이므로 이를 `double` 타입으로 변경하는 것과 함께, 데이터프레임을 보기 쉽도록 행과 열을 pivot을 진행해보겠습니다. `flatMap` 함수를 이용하여 데이터프레임의 각 행에 대해서 반복적으로 적용할 함수를 작성합니다. 첫 번째 열은 `metric` 값이므로 `string`으로 받아서 처리하고, 나머지 행의 모든 열에 대해서는 3-tuple 형태로 `(metric, column_name, value)`를 반복적으로 생성합니다.

In [24]:
val schema = summary.schema
val longForm = summary.flatMap(row => {
    val metric = row.getString(0)
    (1 until row.size).map(i => {
        (metric, schema(i).name, row.getString(i).toDouble)
    })
})

schema: org.apache.spark.sql.types.StructType = StructType(StructField(summary,StringType,true), StructField(id_1,StringType,true), StructField(id_2,StringType,true), StructField(cmp_fname_c1,StringType,true), StructField(cmp_fname_c2,StringType,true), StructField(cmp_lname_c1,StringType,true), StructField(cmp_lname_c2,StringType,true), StructField(cmp_sex,StringType,true), StructField(cmp_bd,StringType,true), StructField(cmp_bm,StringType,true), StructField(cmp_by,StringType,true), StructField(cmp_plz,StringType,true))
longForm: org.apache.spark.sql.Dataset[(String, String, Double)] = [_1: string, _2: string ... 1 more field]


이후에, 이렇게 생성된 3-tuple의 데이터셋을 데이터프레임으로 만들면서 컬럼 명을 지정하면 다음과 같이 아래로 긴 데이터프레임을 얻을 수 있습니다.

In [25]:
val longDF = longForm.toDF("metric", "field", "value")
longDF.show

+------+------------+-------------------+
|metric|       field|              value|
+------+------------+-------------------+
| count|        id_1|          5749132.0|
| count|        id_2|          5749132.0|
| count|cmp_fname_c1|          5748125.0|
| count|cmp_fname_c2|           103698.0|
| count|cmp_lname_c1|          5749132.0|
| count|cmp_lname_c2|             2464.0|
| count|     cmp_sex|          5749132.0|
| count|      cmp_bd|          5748337.0|
| count|      cmp_bm|          5748337.0|
| count|      cmp_by|          5748337.0|
| count|     cmp_plz|          5736289.0|
|  mean|        id_1|  33324.48559643438|
|  mean|        id_2|  66587.43558331935|
|  mean|cmp_fname_c1| 0.7129024704437266|
|  mean|cmp_fname_c2| 0.9000176718903189|
|  mean|cmp_lname_c1| 0.3156278193080383|
|  mean|cmp_lname_c2| 0.3184128315317443|
|  mean|     cmp_sex|  0.955001381078048|
|  mean|      cmp_bd|0.22446526708507172|
|  mean|      cmp_bm|0.48885529849763504|
+------+------------+-------------

longDF: org.apache.spark.sql.DataFrame = [metric: string, field: string ... 1 more field]


이처럼 긴 데이터프레임을 이번에는 컬럼 그룹으로 묶은 다음에 `metric` 값에 대해서 pivot을 진행하면 아래와 같은 넓은 데이터프레임을 얻을 수 있습니다.

In [26]:
val wideDF = longDF.groupBy('field)
    .pivot('metric, Seq("count", "mean", "stddev", "min", "max"))
    .agg(max('value))
wideDF.select('field, 'count, 'mean).show

+------------+---------+-------------------+
|       field|    count|               mean|
+------------+---------+-------------------+
|        id_2|5749132.0|  66587.43558331935|
|     cmp_plz|5736289.0|0.00552866147434343|
|cmp_lname_c1|5749132.0| 0.3156278193080383|
|cmp_lname_c2|   2464.0| 0.3184128315317443|
|     cmp_sex|5749132.0|  0.955001381078048|
|      cmp_bm|5748337.0|0.48885529849763504|
|cmp_fname_c2| 103698.0| 0.9000176718903189|
|cmp_fname_c1|5748125.0| 0.7129024704437266|
|        id_1|5749132.0|  33324.48559643438|
|      cmp_bd|5748337.0|0.22446526708507172|
|      cmp_by|5748337.0| 0.2227485966810923|
+------------+---------+-------------------+



wideDF: org.apache.spark.sql.DataFrame = [field: string, count: double ... 4 more fields]


이렇게 통계 데이터프레임에서 긴 데이터프레임, 그리고 이후 넓은 데이터프레임으로 진행되는 과정을 하나의 함수로 정의합니다.

In [27]:
import org.apache.spark.sql.DataFrame

def pivotSummary(desc: DataFrame): DataFrame = {
    val schema = desc.schema
    
    val lf = desc.flatMap(row => {
        val metric = row.getString(0)
        (1 until row.size).map(i =>{
            (metric, schema(i).name, row.getString(i).toDouble)
        })
    }).toDF("metric", "field", "value")
    
    lf.groupBy('field)
      .pivot('metric, Seq("count", "mean", "stddev", "min", "max"))
      .agg(max('value))
}

import org.apache.spark.sql.DataFrame
pivotSummary: (desc: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


이를 `pivotSymmary`라고 정의했으면, 앞서 `is_match` 값에 따라서 나눈 두 개의 데이터프레임에 이를 적용해봅니다.

In [28]:
val matchSummaryT = pivotSummary(matchSummary)
val missSummaryT = pivotSummary(missSummary)

matchSummaryT: org.apache.spark.sql.DataFrame = [field: string, count: double ... 4 more fields]
missSummaryT: org.apache.spark.sql.DataFrame = [field: string, count: double ... 4 more fields]


그 결과, 아래와 같이 두 개의 데이터프레임에 대한 통계 값들을 확인해볼 수 있습니다. 맨 처음 데이터프레임과 비교해볼 때, 행과 열이 완전히 뒤바뀐 것을 알 수 있습니다. 열이 많아지면 pivot 결과는 아래로 길어지게 되므로 보기가 더욱 수월해집니다.

In [29]:
matchSummaryT.show
missSummaryT.show

+------------+-------+------------------+--------------------+---+-------+
|       field|  count|              mean|              stddev|min|    max|
+------------+-------+------------------+--------------------+---+-------+
|        id_2|20931.0| 51259.95939037791|   24345.73345377519|6.0|99996.0|
|     cmp_plz|20902.0|0.9584250310975027| 0.19962063345931919|0.0|    1.0|
|cmp_lname_c1|20931.0|0.9970152595958817|0.043118807533945126|0.0|    1.0|
|cmp_lname_c2|  475.0| 0.969370167843852| 0.15345280740388917|0.0|    1.0|
|     cmp_sex|20931.0| 0.987291577086618| 0.11201570591216435|0.0|    1.0|
|      cmp_bm|20925.0|0.9979450418160095|0.045286127452170664|0.0|    1.0|
|cmp_fname_c2| 1333.0|0.9898900320318176| 0.08251973727615237|0.0|    1.0|
|cmp_fname_c1|20922.0|0.9973163859635038| 0.03650667584833679|0.0|    1.0|
|        id_1|20931.0| 34575.72117911232|   21950.31285196913|5.0|99946.0|
|      cmp_bd|20925.0|0.9970848267622461| 0.05391487659807981|0.0|    1.0|
|      cmp_by|20925.0|0.9

## 데이터 모델링

두 데이터프레임을 join하여 각 열에 대한 평균값의 차이를 비교해보면 아래와 같습니다. 평균값이 별 차이가 없으면 `is_match` 값에 큰 영향을 주지 않는다고 판단해볼 수 있습니다. 즉, 중요하지 않은 열이라는 것입니다.

In [30]:
matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")

spark.sql("""
    SELECT 
        a.field, 
        a.count + b.count total, 
        a.mean - b.mean delta
    FROM 
        match_desc a 
    INNER JOIN 
        miss_desc b 
    ON 
        a.field = b.field
    WHERE
        a.field NOT IN ("id_1", "id_2")
    ORDER BY delta DESC, total DESC
""").show

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926266|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482594513|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057459947|
|cmp_fname_c2| 103698.0| 0.09104268062280174|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



반대로 평균값의 차이가 매우 큰 값들은 `is_match` 값을 추론하는 것에 있어서 선별력이 큰 값이라고 볼 수 있습니다. 데이터프레임에서 `null` 값이 들어있는 경우가 많기 때문에, 이를 스칼라(Scala)의 `Option`을 이용하여 만약 값이 `null`이면 기본 값을 제공할 수 있도록 타입을 지정합니다. 기존 데이터프레임을 아래의 Case 클래스로 매칭시켜서 별도의 데이터 셋을 생성합니다.

In [31]:
case class MatchData(
    id_1: Int,
    id_2: Int,
    cmp_fname_c1: Option[Double],
    cmp_fname_c2: Option[Double],
    cmp_lname_c1: Option[Double],
    cmp_lname_c2: Option[Double],
    cmp_sex: Option[Int],
    cmp_bd: Option[Int],
    cmp_bm: Option[Int],
    cmp_by: Option[Int],
    cmp_plz: Option[Int],
    is_match: Boolean
)

defined class MatchData


최초 csv에서 읽어온 데이터 프레임을 Case 클래스로 매칭한 후에 해당 데이터셋을 출력해보면 하나의 행이 각각 `MatchData` 클래스로 구성된 데이터 셋을 얻게 됩니다.

In [32]:
val matchData = parsed.as[MatchData]
matchData.show
matchData.printSchema

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|  

matchData: org.apache.spark.sql.Dataset[MatchData] = [id_1: int, id_2: int ... 10 more fields]


여기에 간단한 데이터 모델링을 위한 스코어 클래스를 생성합니다. 스코어 클래스는 `Option[Int]` 값을 받으면 `+` 메소드를 이용해서 각각의 값을 더하되, `null`인 값이 들어오면 0으로 처리합니다.

In [33]:
case class Score(value: Double) {
    def +(oi: Option[Int]) = {
        Score(value + oi.getOrElse(0))
    }
}

defined class Score


`MatchData` 클래스를 받아서 스코어를 계산하는 메소드는 `Score` 클래스의 `+` 메소드를 이용해서 값을 계산합니다. 앞서 계산했던 `is_match` 값에 따른 두 데이터프레임간의 평균값이 차이가 컸던 열 5개를 단순 합산하여 점수를 계산합니다.

In [34]:
def scoreMatchData(md: MatchData): Double = {
    (Score(md.cmp_lname_c1.getOrElse(0.0)) + md.cmp_plz + md.cmp_by + md.cmp_bd + md.cmp_bm).value
}

scoreMatchData: (md: MatchData)Double


그리고 이렇게 정의된 `scoreMatchData` 함수를 데이터 셋에 적용합니다.

In [35]:
val scored = matchData.rdd.map{
    md => (scoreMatchData(md), md.is_match)
}.toDF("score", "is_match")

scored: org.apache.spark.sql.DataFrame = [score: double, is_match: boolean]


적용된 결과를 살펴보면, 모델링 score와 실제 `is_match` 값이 나타납니다.

In [36]:
scored.show

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



## 모델 평가

이제 이렇게 추론한 값과 실제 값을 비교하여 단순 스코어 합산을 이용한 모델을 평가해보도록 하겠습니다. 모델을 이용한 추론 값과 실제 값인 `true`, `false`에 대한 교차 탭 (Crosstab)을 생성합니다. 특정 점수를 오프셋(Offset)으로 잡을 수 있도록 합니다.

In [37]:
def crossTabs(scored: DataFrame, t: Double): DataFrame = {
    scored.selectExpr(s"score >= $t as above", "is_match")
        .groupBy("above")
        .pivot("is_match", Seq("true", "false"))
        .count()
}

crossTabs: (scored: org.apache.spark.sql.DataFrame, t: Double)org.apache.spark.sql.DataFrame


점수 오프셋에 따라서 score가 4점 이상인 경우를 `true`로 판단하면 `true`로 예측했으나 실제 `false`인 값이 637건 존재하게 됩니다. 그 반대의 경우도 60건이 존재합니다. 전체가 570만건이 넘으니까 어느 정도 정확하다고 판단이 듭니다. 여기에서 offset을 낮추면 더 안좋은 결과를 얻게 됩니다.

In [38]:
crossTabs(scored, 4.0).show
crossTabs(scored, 2.0).show

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+



지금까지 기본적인 스파크의 사용 방법과 분석 방법, 데이터 모델링과 추론에 대해서 살펴보았습니다. 단순한 더하기 모델링이지만 훌륭한 결과를 보여주고 있습니다. 더 정확한 추론을 위해서는 데이터 모델을 보다 정확하게 설계할 필요가 있습니다. 또한 적절한 오프셋을 구하는 것도 중요할 것입니다.

아래와 같이 열을 하나 더 추가하고, `cmp_plz` 열의 값에 대해서 약간의 낮은 보정치를 가함으로써 보다 정확한 추론이 가능한 모델을 얻을 수 있습니다.

In [39]:
val scored2 = parsed.na.fill(0)
  .withColumn("score", 'cmp_plz * lit(0.95) + 'cmp_by + 'cmp_bd + 'cmp_lname_c1 + 'cmp_bm + 'cmp_fname_c1)

scored2: org.apache.spark.sql.DataFrame = [id_1: int, id_2: int ... 11 more fields]


In [40]:
crossTabs(scored2, 4.81).show

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20869|     13|
|false|   62|5728188|
+-----+-----+-------+

