In [192]:
import numpy as np

np.array([1.0, 2.1, 3])

array([1. , 2.1, 3. ])

# 데이터 변환

* Last updated 20201025SUN1103 20191128_20181009_20170421_20161125

## S.1 학습내용

### S.1.1 목표

* 정량데이터, 텍스트데이터에 대하여 ETL을 할 수 있다.
* RDD를 변환할 수 있다.
* DataFrame 구조적 데이터를 변환할 수 있다.

## S.3 데이터 변환

Spark를 단순히 데이터를 읽고, groupBy, 평균, 합계를 계산하는 용도로 사용하기에는 충분하지 않다.
데이터로부터 좀 더 의미있는 신호를 읽기 위해서는 예측, 분류, 군집화, 추천 등 기계학습을 빼놓을 수 없다.
이러한 기계학습은 무작정 데이터를 우겨 넣고, 의미있는 결과가 나오기를 기다려서는 작동되지 않는다.
입력될 데이터를 정련하고, 가공하고, 일정한 형식을 가지도록 구성하는 것이 필요하다. 

**ETL** (Extract, Transform, Load)은 소스에서 필요한 데이터를 추출, 변환하고, 다른 타켓으로 로딩하는 것으로 말한다.
* **추출**은 원천에서 데이터를 가져오는 것으로, spark에서는 예를 들어 csv, NoSQL 등에서 데이터를 읽어서 RDD, DataFrame을 생성하는 작업을 말한다.
* **변환**은 분석 가능한 형식으로 변환하는 것으로, 결측 값이나 이상값을 제거하거나 map() 함수를 사용하거나 데이터타입 변환을 하는 등의 작업을 한다.
* **로딩**은 변환한 데이터를 저장하여 놓는 것으로 Spark에서는 RDD, DataFrame의 형식으로 만들어 놓는다.
지도학습 Supervised Learning을 하려면, DataFrame은 label, features 컬럼을, 
**RDD는 label과 features를 가지고 있는 Labeled Point로 구성**해야 한다.

데이터 잘 정렬하고 일정한 형식을 갖추도록 해야한다.

### ETL  => 데이터를 추출하고 변환하고 로딩한다. 
### 데이터 형식을 label과 features를 가지고 있게끔 만들어야한다. 


In [3]:
import iplantuml

ModuleNotFoundError: No module named 'iplantuml'

Spark 초기에는 RDD를 사용하여 데이터를 변환하고, mllib 라이브러리를 사용하여 기계학습을 하였다.
그 후 DataFrame이 소개되고 나서, ml 라이브러리가 사용되고 있다.
Spark 2.0부터 mllib는 유지보수로 지원된다고 한다. 오류가 있으면 수정되지만, 새로운 기능이 추가되지는 않고 있다.
반면에 ml 라이브러리는 새로운 기능이 추가되고 있다.

따라서 Spark에서는 **```RDD mllib```** , **```DataFrame ml```** 패키지 별로 데이터 타입이나 모델이 제공되므로, 식별하여 사용한다.
```ml``` 패키지를 사용할 경우에는 자신의 ```pyspark.ml.linalg.Vector``` 등을 사용해야 한다. ```mllib```도 마찬가지이다.

패키지 | 설명 | 데이터 타입 예
-------|-------|-----
```mllib``` | RDD API를 제공 | ```pyspark.mllib.linalg.Vector``` 또는 ```pyspark.mllib.linalg.Matrix```
```ml``` | DataFrame API를 제공 | ```pyspark.ml.linalg.Vector``` 또는 ```pyspark.ml.linalg.Matrix```


Spark에서는 RDD mllib , 
DataFrame ml 패키지 별로

## S.4 RDD 변환

 label과 속성 features 이 묶인 구조

### S.4.1 vectors

행렬 **Vector**는 **dense**와 **sparse**로 구분할 수 있다.

* dense vector는 빈 값이 별로 없이 **모든** 행열이 값을 가지고 있다.
* sparse vector는 빈 값이 많아서, 값이 있는 경우 그 값이 있는 **인덱스**로 표현해 배열을 축약하게 된다.

#### Dense Vectors 밀집벡터

벡터는 일련의 수로 구성이 되고, 행벡터 또는 열벡터가 될 수 있다. 채워지는 값이 대부분 0이면 희소벡터 Sparse Vectors로 만들어 질 수 있다.

In [4]:
import numpy as np

In [6]:
dv = np.array([1.0, 2.1, 3])
#위에 numpy로 dense vector 만듬 


In [8]:
from pyspark.mllib.linalg import Vectors

dv1mllib=Vectors.dense([1.0, 2.1, 3])
#mylib에서 dense vector만듬 

In [9]:
print ("Dense vector: {}\nType: {}".format(dv1mllib, type(dv1mllib)))

Dense vector: [1.0,2.1,3.0]
Type: <class 'pyspark.mllib.linalg.DenseVector'>


In [12]:
from pyspark.ml.linalg import Vectors
#ml에서 만든 dense vectors
dv1ml=Vectors.dense([1.0, 2.1, 3])

In [13]:
print ("ml의 dense vector: {}".format(dv1ml))

ml의 dense vector: [1.0,2.1,3.0]


dense vectors는 numpy array와 같은 특징을 가진다.

인덱스로 값을 읽을 수 있다. 또한 반복문에서 사용할 수 있다.

In [15]:
for e in dv1mllib:
    print (e, end = " ")
    #공백으로 연결

1.0 2.1 3.0 

In [17]:
dv1mllib.dot(dv1mllib)
#dot연산 


14.41

In [18]:
np.dot(dv,dv)

14.41

In [20]:
dv1mllib*dv1mllib
#vector연산은 많이 하지 않는다. 

DenseVector([1.0, 4.41, 9.0])

#### Sparse Vectors 희소행렬

행렬에는 0 값이 많이 존재하기 때문에, 0값이 아닌 **NZ Nonzero**만 저장하면 훨씬 효율적이다.
**sparse**는 실제 **값이 없는 요소, '0'을 제거**하여 만든 vector이다.
Spark에서 type field (1 바이트 길이)를 통해 식별한다 (0: sparse, 1: dense)

예를 들어, 다음은 1차원 dense vector이다.
```python
[160, 69, 0, 0, 24]
```

sparse vectors는 값 중에 0이 포함된 경우 이를 생략하고, 값이 있는 요소 Nonzero만 남기게 된다.
3은 컬럼 갯수, 0, 1, 4는 값이 있는 컬럼, [160.0, 69.0, 24.0]는 실제 값을 의미한다.

In [22]:
sv1 = Vectors.sparse(5,[0,1,4],[160.0,69.0,24.0])
#0값을 없애준다. 

어느 모듈, mllib 또는 ml인지 확인해보자.

In [23]:
type(sv1)

pyspark.ml.linalg.SparseVector

```toArray()``` 함수를 사용하면 sparse에서 dense로 벡터를 변환할 수 있다.

### S.4.2 Matrix

로컬 Matrix 역시 밀집 dense, 희소 sparse형식을 지원한다.

In [26]:
from pyspark.mllib.linalg import Matrices

Matrices.dense(3, 2, [1,2,3,4,5,6])
#3,2니까 6개 요소가 있다. 

DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)

#### Sparse Vectors

```python
[1 0 2]
[0 0 3]
[4 5 6]
```

위와 같은 2차원 dense vectors를 sparse vectors의 배열 방식으로 표현해보자.
우선 다음과 같이
행, 열, 값 vector를 만든다.

```python
행 | 0 | 0 | 1 | 2 | 2 | 2
열 | 0 | 2 | 2 | 0 | 1 | 2
값 | 1 | 2 | 3 | 4 | 5 | 6
```

행을 보면 0번째에 '1','2' 1번째에 '3', 2번째에 '4','5','6'이므로 **0,0,1,2,2,2**
열을 보면 0번째에 '1', 2번째 '2','3', 0번째 '4', 1번째 '5', 2번째 '6'이므로 **0,2,2,0,1,2**

**행, 열, 데이터를 한 쌍**으로 읽으면 된다.
즉 행 0, 열 0의 위치에 1, 행 0, 열 2의 위치에 2. 이런 식으로 6개의 데이터가 있다.


In [28]:
import numpy as np

row = np.array([0, 0, 1, 2, 2, 2])
col = np.array([0, 2, 2, 0, 1, 2])
data = np.array([1, 2, 3, 4, 5, 6])

In [29]:
import scipy.sparse as sps

mtx = sps.csc_matrix((data, (row, col)), shape=(3, 3))

In [30]:
print (mtx.todense())

[[1 0 2]
 [0 0 3]
 [4 5 6]]


#### Sparse Vectors의 CSR (Compressed Sparse Row) 또는 Yale Format

다음 5개의 값으로 표현된다.
* 첫째 행 개수 (```int```)
* 둘째 열 개수 (```int```)
* 세째 ```int[]```은 **열 포인터** (IA):
    * IA[0]=**0** 시작 값은 0으로, IA[i]=IA[i-1] + (i-1)열의 **NNZ** (NZ 개수, Num of NonZeroes)
* 네째 ```int[]```은 **행 인덱스** (JA): 각 NZ의 행 인덱스
* 마지막은 소수 (```double```)로 실제 값 리스트

In [31]:
from pyspark.mllib.linalg import Matrices
#6개의 행에 4개열 
dm = Matrices.dense(6,4,[1, 2, 0, 0, 0, 0, 0, 3, 0, 4, 0, 0, 0, 0, 5, 6, 7, 0, 0, 0, 0, 0, 0, 8])

In [32]:
dm.toArray()
#첫번쨰 1,2 
#두번쨰 3,4 
#세번째 5,6,7
#마지막 8

array([[1., 0., 0., 0.],
       [2., 3., 0., 0.],
       [0., 0., 5., 0.],
       [0., 4., 6., 0.],
       [0., 0., 7., 0.],
       [0., 0., 0., 8.]])

다음을 희소행렬로 변환해보자.

```python
[ 1.,  0.,  0.,  0.]
[ 2.,  3.,  0.,  0.]
[ 0.,  0.,  5.,  0.]
[ 0.,  4.,  6.,  0.]
[ 0.,  0.,  7.,  0.]
[ 0.,  0.,  0.,  8.]
```

* 6은 행 갯수
* 4는 열 갯수
* 다음은 열포인터 [0, 2, 4, 7, 8]
    * **열**로 세어서 **요소의 개수** 2, 2, 3, 1을 가지고 구성을 한다. 즉 열로 요소가 2, 2, 3, 1개 이다.
    * **0**:IA[0]=0으로 시작, **2**:IA[1]=IA[0]+2, **4**:IA[2]=IA[1]+2 **7**:IA[3]=IA[2]+3, **8**:IA[4]=IA[3]+1
* 다음은 행인덱스 JA [0, 1, 1, 3, 2, 3, 4, 5]
    * **행**으로 세어서 (단 컬럼순을 지켜서) 0 1 1 3 2 3 4 5 (0 1 1 2 3 3 4 5가 아니라)
    * 1:**0**행, 2:**1**행, 3:**1**행, 4:**3**행, 5:**2**행, 6:**3**행, 7:**4**행, 8:**5**행
* 마지막은 소수로 실제 값 리스트 [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

In [33]:
dm.toSparse()
#첫번째 열로센다.열의 인덱스 
#두번째 배열 행으로 센다 행의 인덱스  .하지만 행의 순서로, 순서대로
#마지막은 배열안의 값들 리스트로


SparseMatrix(6, 4, [0, 2, 4, 7, 8], [0, 1, 1, 3, 2, 3, 4, 5], [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], False)

### S.4.2 분산 Matrix

배열은 n차원을 가질 수 있고, 2차원인 경우에는 매트릭스라고 지칭한다.
매트릭스 역시 로컬과 분산으로 구분할 수 있다.
앞서 로컬 매트릭스는 라이브러리 명에서 눈치를 챌 수 있는데, ```pyspark.mllib.linalg.Matrix, Matrices```를 사용한다. 반면에 분산 매트릭스는 당연히 여러 노드에 분산해서 사용할 수 있고, ```pyspark.mllib.linalg.distributed```에 존재하는 Row Matrix, Indexed Row Matrix, Coordinate Matrix, Block Matrix를 사용하면 된다.

여러기계에 partition으로 나눠서 존재한다. 

local 매트릭스 -> 하나의 기계에 존재

분산 매트릯스 -> 여러 기계에 partition으로 나눠서 존재

spark -> 분산에서 사용

pandas -> 분산아니다.

#### Row Matrix

RowMatrix는 ```pyspark.mllib.linalg.distributed```에서 제공되는 분산벡터로서, RDD vectors로부터 생성된다.
우선 리스트에서 RDD를 생성하고 이를 RowMatrix에 넘겨주자.

In [34]:
p = [[1.0,2.0,3.0],[1.1,2.1,3.1],[1.2,2.2,3.3]]

In [36]:
import pyspark

myConf= pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
        .builder\
        .master("local")\
        .appName("myApp")\
        .config(conf=myConf)\
        .getOrCreate()

In [37]:
my=spark.sparkContext.parallelize(p)

In [38]:
my.collect()

[[1.0, 2.0, 3.0], [1.1, 2.1, 3.1], [1.2, 2.2, 3.3]]

In [39]:
from pyspark.mllib.linalg.distributed import RowMatrix
#row matrix -> distributed 안에서 가져온다. 
rm=RowMatrix(my)

In [40]:
print (type(rm))

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


In [41]:
rm.rows.collect()

[DenseVector([1.0, 2.0, 3.0]),
 DenseVector([1.1, 2.1, 3.1]),
 DenseVector([1.2, 2.2, 3.3])]

In [44]:
rm.numRows()
#2차원 배열은 배열인데 분산에서 쓿수 있는 배열은 row matrix

3

In [43]:
rm.numCols()

3

#### Indexed Row Matrix

앞서 Row Matrix과 유사하지만, 파티션으로 나누어, 그러나 순서를 지켜서 저장이 된다.
따라서 시계열 데이터와 같이 순서가 있는 데이터를 저장하기에 적합하다.

순서인덱스와 벡터로 구성한다.

In [46]:
from pyspark.mllib.linalg.distributed import IndexedRow
#괄호를 열고 두개의 요소로 
# 1,2,3,4,5 -> 일렬번호다. 
irRdd = spark.sparkContext.parallelize([
    IndexedRow(1, [3, 1, 2]),
    IndexedRow(2, [1, 3, 2]),
    IndexedRow(3, [5, 4, 3]),
    IndexedRow(4, [6, 7, 4]),
    IndexedRow(5, [8, 9, 2]),
])

In [47]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix

irm = IndexedRowMatrix(irRdd)

In [48]:
print(irm.numRows())
print(irm.numCols())

6
3


### S.4.4 Labeled Point

Labeled point는 로컬벡터로 레이블을 가지고 있는 밀집 또는 희소 행렬을 말한다.
레이블이 있으므로, supervised learning에 요구되는 형식이다.
레이블은 double형식으로 저장되어야 한다. 분류에 사용되려면 예를 들어 긍정, 부정인 경우 정수 1, 0으로 하지 않고 double 형식으로 저장되어야 한다.

#### label, features로 구성

**분류** 및 **회귀분석**에 사용되는 데이터 타잎이다.
**'label'**과 **'features'**로 구성된다.

구분 | 지도학습을 하기 위한 label과 features의 구성
-----|-----
label | supervised learning에서 '구분 값'으로 사용한다. 데이터타입은 'DoubleType'으로 설정되어야 한다.
features | **sparse**, **dense** 모두 사용할 수 있다.


In [49]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, [1.0, 2.0, 3.0])

LabeledPoint(1.0, [1.0,2.0,3.0])

In [52]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

#개수, 실제 값이 있는 희소행렬 
#1992는 레이블 

LabeledPoint(1992, Vectors.sparse(10, {0: 3.0, 1:5.5, 2: 10.0}))

LabeledPoint(1992.0, (10,[0,1,2],[3.0,5.5,10.0]))

서로 다른 패키지의 데이터타잎 **```mllib LabeledPoint```**와 **```ml Vectors```**를 혼용하면, 형변환 오류가 발생한다.
이러한 오류는 패키지를 혼용하지 않으면 된다.

```python
Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector
```

**```dv1mllib```**은 앞서 **```mllib```**로부터 생성된 dense vector이다.

In [53]:
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, dv1mllib)

LabeledPoint(1.0, [1.0,2.1,3.0])

In [54]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1.0, Vectors.fromML(dv1ml))

LabeledPoint(1.0, [1.0,2.1,3.0])

 ## 문제 S-1: RDD 데이터를 LabeledPoint로 변환하기

### 문제

머신러닝은 사람이 경험을 통해 배우는 것과 비슷하게 **과거 데이터로부터 학습**을 한다.
학습이란 어렵게 생각할 필요 없이, 과거 데이터에서 수학적이나 알고리즘을 활용하여 어떤 패턴을 찾아내는 것이다.
spark에서 제공한 **데이터 파일 ```data/mllib/sample_svm_data.txt```을 읽어서 훈련데이터**를 만들어 보자.

데이터를 읽어 보면, 맨 처음 값은 label에 해당하고, 다음은 일련의 수로 구성된다. 이로부터 **RDD**를 생성하고, ```label```, ```features```를 구성하여 ```Labeled Point```로 만든다.

```python
1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0
...
```

### Python으로 파일 읽기

Spark를 다운로드하고 압축을 풀어 설치한 경우, 환경변수 ```SPARK_HOME```의 경로를 설정해 주고 해당 파일을 읽도록 한다. 파일을 읽을 때는, 가급적 이와 같이 설정경로를 사용해 입력오류를 줄이도록 하는 것이 좋다.
다운로드 받지 않았다면, 아파치 github을 방문해서, https://github.com/apache/spark/ 아래 data/mllib 폴더로 가보면 해당 파일을 찾을 수 있다.

입출력은 ```try except``` 구문으로 오류에 대비할 수 있다.

In [59]:
import os
_fsvm=os.path.join('data','sample_svm_data.txt')

In [60]:
import os

try:
    _f=open(_fsvm,'r')
    _lines=_f.readlines()
    _f.close()
except:
    print("An exception occurred")

파일로부터 데이터를 **```readlines()```** 함수로 모두 읽어 온다.
첫 행을 읽으면 label, features로 구성되어 있다.

In [61]:
_lines[0]

'1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0\n'

### Spark에서 RDD 생성

Spark는 파일을 Python을 통하지 않고, 직접 읽을 수 있다.
원본 데이터 ```sample_svm_data.txt```는 공백으로 구분되어 있다.
읽을 대상이 파일이므로, RDD를 사용한다. 각 행을 공백으로 분리하여 읽는다.

In [63]:
#_fsvm은 파일의 경로
_rdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])

In [64]:
_rdd.collect()
#map함수로 , 자연스럽게 분리.
#index를 두개 사용해서 읽어야한다. 


[[1.0,
  0.0,
  2.52078447201548,
  0.0,
  0.0,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  0.0,
  2.619965104088255,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  2.061393766919624,
  0.0,
  0.0,
  2.004684436494304,
  0.0,
  0.0,
  2.228387042742021,
  2.228387042742023,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [1.0,
  0.0,
  0.0,
  2.061393766919624,
  2.619965104088255,
  0.0,
  2.004684436494304,
  2.000347299268466,
  0.0,
  0.0,
  0.0,
  0.0,
  2.055002875864414,
  0.0,
  0.0,
  0.0,
  0.0],
 [1.0,
  2.857738033247042,
  0.0,
  2.061393766919624,
  2.619965104088255,
  0.0,
  2.004684436494304,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  2.055002875864414,
  0.0,
  0.0,
  0.0,
  0.0],
 [0.0,
  2.857738033247042,
  0.0,
  

In [66]:
#index를 두개 사용해서 읽어야한다
_rdd.take(2)[0]

[1.0,
 0.0,
 2.52078447201548,
 0.0,
 0.0,
 0.0,
 2.004684436494304,
 2.000347299268466,
 0.0,
 2.228387042742021,
 2.228387042742023,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0]

### LabeledPoint 생성

위 데이터에서 보듯이 첫 열은 **label**로, 그 나머지는 **features**로 생성한다.

In [67]:
from pyspark.mllib.regression import LabeledPoint
#[1:] -> 뜻: 1이후 라는뜻 

_trainRdd0=_rdd.map(lambda line:LabeledPoint(line[0], line[1:]))

In [69]:
_trainRdd0.take(1)
#레이블 분리후 features로 생성 


[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

In [71]:
#한꺼번에 처리 아래 방식은 
_trainRdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])\
    .map(lambda p:LabeledPoint(p[0], p[1:]))

In [72]:
_trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

### 정리하면

데이터를 변환하는 과정을 함수로 만들었다.
```createLP(line)```는 행 데이터를 받아서 LabeledPoint로 생성하고 있다.

In [73]:
def createLP(line):
    p = [float(x) for x in line.split()]
    return LabeledPoint(p[0], p[1:])

_rdd=spark.sparkContext.textFile(_fp)
trainRdd = _rdd.map(createLP)

trainRdd.take(1)

NameError: name '_fp' is not defined

### S.4.5 svm의 입력파일 형식

LIBSVM은 기계학습 모델인 svm을 위한 입력데이터 형식이다.
0은 label, 나머지는 index:value 쌍으로 구성한다.

```python
[label] [index1]:[value1] [index2]:[value2] ...
[label] [index1]:[value1] [index2]:[value2] ...
```

* 예
```python
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 ...
```


In [82]:
import os
fsvm=os.path.join('data','sample_libsvm_data.txt')

In [83]:
dfsvm = spark.read.format("libsvm").load(fsvm)

In [84]:
dfsvm.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [85]:
dfsvm.take(1)

[Row(label=0.0, features=SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0

### MLUtils를 사용하여 RDD 읽기

또는 ```MLUtils.loadLibSVMFile```로 읽을 수 있다.

In [86]:
from pyspark.mllib.util import MLUtils

data = MLUtils.loadLibSVMFile(spark.sparkContext, fsvm)

In [87]:
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)

In [88]:
label.take(5)

[0.0, 1.0, 1.0, 1.0, 1.0]

In [89]:
features.take(1)

[SparseVector(692, {127: 51.0, 128: 159.0, 129: 253.0, 130: 159.0, 131: 50.0, 154: 48.0, 155: 238.0, 156: 252.0, 157: 252.0, 158: 252.0, 159: 237.0, 181: 54.0, 182: 227.0, 183: 253.0, 184: 252.0, 185: 239.0, 186: 233.0, 187: 252.0, 188: 57.0, 189: 6.0, 207: 10.0, 208: 60.0, 209: 224.0, 210: 252.0, 211: 253.0, 212: 252.0, 213: 202.0, 214: 84.0, 215: 252.0, 216: 253.0, 217: 122.0, 235: 163.0, 236: 252.0, 237: 252.0, 238: 252.0, 239: 253.0, 240: 252.0, 241: 252.0, 242: 96.0, 243: 189.0, 244: 253.0, 245: 167.0, 262: 51.0, 263: 238.0, 264: 253.0, 265: 253.0, 266: 190.0, 267: 114.0, 268: 253.0, 269: 228.0, 270: 47.0, 271: 79.0, 272: 255.0, 273: 168.0, 289: 48.0, 290: 238.0, 291: 252.0, 292: 252.0, 293: 179.0, 294: 12.0, 295: 75.0, 296: 121.0, 297: 21.0, 300: 253.0, 301: 243.0, 302: 50.0, 316: 38.0, 317: 165.0, 318: 253.0, 319: 233.0, 320: 208.0, 321: 84.0, 328: 253.0, 329: 252.0, 330: 165.0, 343: 7.0, 344: 178.0, 345: 252.0, 346: 240.0, 347: 71.0, 348: 19.0, 349: 28.0, 356: 253.0, 357: 252.0

### S.4.6 TF

지금까지는 정량데이터를 다루었지만, **텍스트**를 변환해보자.
TF (Term Frequency)
단어빈도를 계산하기 위해 HashingTF를 사용할 수 있다.
단어ID로 Hash 알고리즘에 따라 무작위 번호를 생성하고, 단어빈도를 생성한다.
Hash를 사용하지 않고 계산한 단어빈도는 당연히 동일하다는 것을 알 수 있다.

hashing -> 랜덤으로 생성되는 아이디

자동으로 알고리즘에 따라 랜덤으로 번호 만들어서 각각에 아이디를 부여한다. 

그렇게 빈도를 만들어준게 hashing TF


In [90]:
wikiRdd3 = spark.sparkContext\
    .textFile(os.path.join("data","ds_spark_wiki.txt"))\
    .map(lambda line: line.split())

In [93]:
from pyspark.mllib.feature import HashingTF

hashingTF = HashingTF()
#transform-> 라이브러리 만들어 놓고 rdd를 입력으로 사용한다. 

tf = hashingTF.transform(wikiRdd3)
tf.collect()
# 한줄에 vector가 연결되어있다. 
# 아이디는 자동으로 생성 1026674번째 컬럼에  빈도 가 1.0이고 
# 빈도 전체의 개수는 1048576이다 

[SparseVector(1048576, {1026674: 1.0}),
 SparseVector(1048576, {148618: 1.0, 183975: 1.0, 216207: 1.0, 261052: 1.0, 617454: 1.0, 696349: 1.0, 721336: 1.0, 816618: 1.0, 897662: 1.0}),
 SparseVector(1048576, {60386: 1.0, 177421: 1.0, 568609: 1.0, 569458: 1.0, 847171: 1.0, 850510: 1.0, 1040679: 1.0}),
 SparseVector(1048576, {261052: 4.0, 816618: 4.0}),
 SparseVector(1048576, {60386: 4.0, 594754: 4.0}),
 SparseVector(1048576, {21980: 1.0, 70882: 1.0, 274690: 1.0, 357784: 1.0, 549790: 1.0, 597434: 1.0, 804583: 1.0, 829803: 1.0, 935701: 1.0}),
 SparseVector(1048576, {154253: 1.0, 261052: 1.0, 438276: 1.0, 460085: 1.0, 585459: 1.0, 664288: 1.0, 816618: 1.0, 935701: 2.0, 948143: 1.0, 1017889: 1.0}),
 SparseVector(1048576, {270017: 1.0, 472985: 1.0, 511771: 1.0, 718483: 1.0, 820917: 1.0}),
 SparseVector(1048576, {34116: 1.0, 87407: 1.0, 276491: 1.0, 348943: 1.0, 482882: 1.0, 549350: 1.0, 721336: 1.0, 816618: 1.0, 1025622: 1.0}),
 SparseVector(1048576, {1769: 1.0, 151357: 1.0, 500659: 1.0, 54776

### S.4.7 TF-IDF

IDF는 전체에서 몇 개의 문서에 씌였는지를 반대로 계산한 값이다.
뒤 DataFrame를 사용하여 TF-IDF를 계산하면서 자세히 설명하기로 한다.

In [97]:
from pyspark.mllib.feature import HashingTF, IDF
#IDF함수 사용
#tf를 넣어 IDF계산해준다. 
idf = IDF().fit(tf)
# 그리고 transform해줘야한다. 
tfidf = idf.transform(tf)

In [98]:
tfidf.collect()


[SparseVector(1048576, {1026674: 1.7047}),
 SparseVector(1048576, {148618: 1.7047, 183975: 1.7047, 216207: 1.7047, 261052: 1.0116, 617454: 1.7047, 696349: 1.7047, 721336: 1.2993, 816618: 0.7885, 897662: 1.7047}),
 SparseVector(1048576, {60386: 1.2993, 177421: 1.7047, 568609: 1.7047, 569458: 1.7047, 847171: 1.7047, 850510: 1.7047, 1040679: 1.7047}),
 SparseVector(1048576, {261052: 4.0464, 816618: 3.1538}),
 SparseVector(1048576, {60386: 5.1971, 594754: 6.819}),
 SparseVector(1048576, {21980: 1.7047, 70882: 1.7047, 274690: 1.7047, 357784: 1.7047, 549790: 1.7047, 597434: 1.7047, 804583: 1.7047, 829803: 1.7047, 935701: 1.2993}),
 SparseVector(1048576, {154253: 1.7047, 261052: 1.0116, 438276: 1.7047, 460085: 1.7047, 585459: 1.7047, 664288: 1.7047, 816618: 0.7885, 935701: 2.5986, 948143: 1.7047, 1017889: 1.7047}),
 SparseVector(1048576, {270017: 1.7047, 472985: 1.7047, 511771: 1.7047, 718483: 1.7047, 820917: 1.7047}),
 SparseVector(1048576, {34116: 1.7047, 87407: 1.7047, 276491: 1.7047, 3489

하는 이유 -> rdd를 훈련데이터 형식으로 바꾸기 위ㅐ해서 

### S.4.8 StandardScaler

데이터를 표준화하려면 1) 평균과 표준편차를 계산하고, 2) 측정값에서 평균을 빼고, 표준편차로 나누어 주면 된다.
즉 zscore를 계산하는 것과 같다.

$$ z = \frac {\bar{x_n} - \mu} {\sigma / \sqrt{n}} $$


In [100]:
tRdd = spark.sparkContext\
    .textFile(os.path.join('data', 'ds_spark_heightweight.txt'))

In [101]:
tRdd.map(lambda x: x.split('\t')).take(1)

[['1', '65.78', '112.99']]

In [102]:
tRdd.map(lambda x: x.split('\t')).map(lambda x: [str(x[0]), float(x[1]), float(x[2])]).take(1)

[['1', 65.78, 112.99]]

In [103]:
tRdd.map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .take(1)

[['1', 65.78, 112.99]]

2개의 값만 추출하여 dense vectors에 별도로 저장한다ㅁ

In [105]:
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: Vectors.dense([x[1], x[2]]))

#첫번쨰껏은 버린다. 65.78과 112.99만 끌어온다. 

리스트로 저장해도 계산에 문제가 없다.

In [106]:
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: [x[1], x[2]])

#### 표준화

In [108]:
from pyspark.mllib.feature import StandardScaler
#fit할때 -> 평균 표준편차를 사용하지 않고 
scaler1 = StandardScaler().fit(_tRdd)
#평균 표준편차 적용해서 사용
scaler2 = StandardScaler(withMean=True, withStd=True).fit(_tRdd)

앞서 계산한 zscore와 비교해보자.

In [109]:
scaler2.transform(_tRdd).take(5)
#간단하게  

[DenseVector([-1.2458, -1.2299]),
 DenseVector([1.9011, 0.5934]),
 DenseVector([0.7388, 1.8767]),
 DenseVector([0.0919, 1.0473]),
 DenseVector([-0.1439, 1.1993])]

## S.5 DataFrame 변환

DataFrame으로 만들어진 데이터를 변환해보자.
이러한 작업이 필요한 이유는 **기계학습에 넘겨줄 입력데이터를 형식에 맞추어야** 하기 때문이다.
데이터는 형식에 맞게 변환되고, 군집화, 회귀분석, 분류, 추천 모델 등에 입력으로 사용된다
물론 데이터는 '일련의 수' 또는 '텍스트'로 구성된다.
이런 데이터로부터 특징을 추출하여 **feature vectors**를 구성한다.
지도학습을 하는 경우에는 **class 또는 label** 값이 필요하다.


변환작업을 하는 이우는 기계학습에 넘겨줄 입력데이터를 형식에 맞춰 변환을 해야함 

그리고나서 생각해야한다. 

지도 학습을 하는 경우에는 class와 label 필요

### S.5.1 Labeled Point를 label, features 컬럼으로 분해

RDD LabeledPoint는 label과 vectors로 구성되어 있다.
따라서 LabeledPoint를 DataFrame으로 읽어오면, 2개의 컬럼으로 생성된다.
이를 label, features 컬럼으로 맞추어 주도록 하자.

#### 레이블이 있는 Python List에서 DataFrame 생성

label과 features를 가지고 가지고 있는 데이터를 생성해보자.

In [110]:
p = [[1, [1.0, 2.0, 3.0]], [1, [1.1, 2.1, 3.1]], [0, [1.2, 2.2, 3.3]]]

첫 데이터를 분해해서 출력하면, label과 features를 하나씩 묶어 가지고 있다.

In [111]:
print ("label: {}\nfeatures: {}".format(p[0][0], p[0][1]))

label: 1
features: [1.0, 2.0, 3.0]


In [112]:
trainDf=spark.createDataFrame(p)

In [113]:
trainDf.collect()

[Row(_1=1, _2=[1.0, 2.0, 3.0]),
 Row(_1=1, _2=[1.1, 2.1, 3.1]),
 Row(_1=0, _2=[1.2, 2.2, 3.3])]

#### LabeledPoint에서 DataFrame 생성

Python List를 LabeledPoint로 만들어 보자.
**LabeledPoint는 RDD에서 사용하는 구조로서 mllib 라이브러리를 사용**해서 만들고 있다.
**DataFrame은 LabeledPoint를 컬럼으로 가지고 있지 않는다**.

In [114]:
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1, [1.0,2.0,3.0]),
     LabeledPoint(1, [1.1,2.1,3.1]),
     LabeledPoint(0, [1.2,2.2,3.3])]

In [115]:
trainDf=spark.createDataFrame(p)

In [117]:
trainDf.collect()
#label과 features로 자동분해해서 자동으로 생성

[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0),
 Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0),
 Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]

In [118]:
from pyspark.mllib.linalg import Vectors

trainDf = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, 1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, 0.5]))], ["label", "features"])
#마지맛은 컬럼명을 준다. 

In [119]:
trainDf.collect()

[Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])),
 Row(label=0.0, features=DenseVector([2.0, 1.0, 1.0])),
 Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])),
 Row(label=1.0, features=DenseVector([0.0, 1.2, 0.5]))]

#### RDD에서 DataFrame 생성

rdd에서 DataFrame을 생성하면 labe, features이 당연히 생성이 되지 않는다.

In [120]:
#from pyspark.mllib.linalg import SparseVector # mllib ok
from pyspark.ml.linalg import SparseVector # ml ok

_rdd = spark.sparkContext.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

In [122]:
_df=_rdd.toDF()

In [123]:
_df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: vector (nullable = true)



In [124]:
_df=_df.withColumnRenamed('_1', 'label').withColumnRenamed('_2', 'features')

In [125]:
_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0| (4,[1,3],[1.0,5.5])|
|  1.0|(4,[0,2],[-1.0,0.5])|
+-----+--------------------+



### S.5.2 단어 빈도

정량 데이터는 합계, 평균, 표준편차 등 의미있는 통계량을 계산하거나, 이런 통계량이 집단 간에 차이가 있는지 분석한다.
반면에 텍스트는 정량데이터와 같이 이러한 통계량의 계산이 가능하지 못하게 된다.
텍스트에 어떤 단어가 얼마나 쓰였는지, 또한 같이 쓰이게 된 단어는 무엇인지 등 단어의 빈도에 따라 **정량화하여 과학적인 분석**을 하게 된다.

#### Bag of Words 모델

자연어처리 NLP에서 사용하는 모델로, 텍스트를 단어의 집합, **'bag of words'**으로 구성된다고 보며, 단어의 **순서**는 의미를 가지지 않는다.
이런 영화리뷰가 있다고 하자.
> "...그 영화는 매우 강렬했다. 그냥 좋았다. 영화관에서 보는 동안 긴장을 늦출 수 없었다. 갑돌이가 분장한 악당의 케릭터가 만들어지는 과정은 흥미롭지 않을 수가 없었다. 무비의 이야기 전개는 빠르고, 무엇이 진실이고 거짓인지 판단할 수 없었다. 누가 이런 영화를 좋아 하지 않을 수가 있겠는가 이모티콘..."

이를 단어로 분리하고, 정량화 해보자.

#### 텍스트 변환 단계

텍스트를 변환하는 단계를 보자. 순서는 변경될 수 있다.

* 단계 1: 단어로 분할 Tokenization
    * 그, 영화는, 매우, 강렬했다, 그냥, 좋았다, 영화관에서, 보는, 동안, 긴장을, 늦출, 수, 없었다, 갑돌이가, 분장한, 악당의, 케릭터가, 만들어지는, 과정은, 흥미롭지, 않을, 수가, 없었다, 무비의, 이야기, 전개는, 빠르고, 무엇이, 진실이고, 거짓인지, 판단할, 수, 없었다, 누가, 이런, 영화를, 좋아, 하지, 않을, 수가, 있겠는가, 이모티콘

* 단계 2: 정리
    - 불필요, 오타 등

* 단계 3: 불용어 stopwords 제거
    - 그, 수, 수가, 수, 이런, 하지, 수가 등

# stemming은 우리 수업에서 다루지 않는다
* 단계 4: 어간 추출 stemming
    영화는, 영화의는 다른 단어지만 조사를 제거하면 동일한 단어
    - 좋았다, 좋아 단어들은 어근을 판별하면 동일한 단어이다.
    - 영화, 무비의 단어는 이음동의

단계 5: 계량화
- word vector로 만든다.
- 있다-없다, 단어빈도, TF-IDF 사용할 수 있다.<br>dense, sparse 모두 가능하다.
```[1,1,1,1,1,0,0],[0,1,0,1,1,1,1]```

```python
[0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 1 1 0 0 0]
[0 0 1 0 0 0 0 0 0 0 0 1 1 1 0 0 0 0 0 0 0 0 1 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[1 0 0 1 0 0 1 1 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 1 0 1 1 0 0 0 1 0 0 0 1 1 1 0 1 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 1 1]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
[0 1 0 0 0 0 0 0 0 1 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 1 1]
```

### 컬럼에는 단어 들어간다. 

In [126]:
# Let it be lyrics
doc=[
    "When I find myself in times of trouble",
    "Mother Mary comes to me",
    "Speaking words of wisdom, let it be",
    "And in my hour of darkness",
    "She is standing right in front of me",
    "Speaking words of wisdom, let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Whisper words of wisdom, let it be"
]

In [135]:
d={}
for sentence in doc:
    words=sentence.split()
    for word in words:
        if word in d:
            d[word]+=1
        else:
            d[word]=1

앞서 단어 빈도는 dictionary d에 저장하였다.
dictionary는 키, 빈도의 쌍으로 저장되어 있어서 ```iteritems()```으로 읽어낼 수 있다.

In [136]:
# for k,v in d.iteritems():  # python2
for k,v in d.items():
    print ("{}\t{}".format(k,v))

When	1
I	1
find	1
myself	1
in	3
times	1
of	6
trouble	1
Mother	1
Mary	1
comes	1
to	1
me	2
Speaking	2
words	3
wisdom,	3
let	3
it	7
be	7
And	1
my	1
hour	1
darkness	1
She	1
is	1
standing	1
right	1
front	1
Let	4
Whisper	1


### S.5.3 Spark의 transformer, estimator

**RDD**를 만들고 나서도 데이터를 변환하기 위해 map-reduce와 같은 함수 또는  **transform()**, **fit()**을 사용하는 것과 같이,
**DataFrame**도 역시 **Transformer**, **Estimator**를 사용할 수 있다.
이러한 Spark ml 라이브러리는 Python의 scikit-learn에서 영향을 받아 기계학습 API transformer, estimator, evaluator가 유사하다.
* Estimator는 DataFrame에 적용되는 알고리즘을 말하는 것으로, Transformer를 생성해낸다. **```Estimator.fit()```**
* Transformer는 DataFrame을 적용해서 다른 DataFrame으로 생성한다. **```Transformer.transform()```**
* Evaluator는 ```pyspark.ml.evaluation```의 'BinaryClassificationEvaluator', 'RegressionEvaluator',  'MulticlassClassificationEvaluator', 'MultilabelClassificationEvaluator', 'ClusteringEvaluator', 'RankingEvaluator' 등이 있다.

먼저 텍스트를 2차원 배열로 만들자.

In [139]:
#2ㅊㅏ원
doc2d=[
    ["When I find myself in times of trouble"],
    ["Mother Mary comes to me"],
    ["Speaking words of wisdom, let it be"],
    ["And in my hour of darkness"],
    ["She is standing right in front of me"],
    ["Speaking words of wisdom, let it be"],
    [u"우리 Let it be"],
    [u"나 Let it be"],
    [u"너 Let it be"],
    ["Let it be"],
    ["Whisper words of wisdom, let it be"]
]

In [140]:
myDf=spark.createDataFrame(doc2d, ['sent'])

In [141]:
myDf.show(truncate=True)

+--------------------+
|                sent|
+--------------------+
|When I find mysel...|
|Mother Mary comes...|
|Speaking words of...|
|And in my hour of...|
|She is standing r...|
|Speaking words of...|
|      우리 Let it be|
|        나 Let it be|
|        너 Let it be|
|           Let it be|
|Whisper words of ...|
+--------------------+



### S.5.4 Tokenizer

먼저 용어를 정리해 보자.
* corpus는 어떤 주제에 대해 쓰여지거나, 어떤 사람이 작성한 전체 '말뭉치'를 말한다. 여러 문장으로 구성된 텍스트 집합을 말한다.
* document는 문장으로 구성된 문서를 말하지만, 한 문장으로만 구성될 수도, 여러 문장으로 만들어질 수도 있다. 예를 들어, "why she had to go" 같은 한 문장도 document라고 하고, "why she had to go.. I don't know" 역시 마찬가지이다.
* vocabularay는 중복없는 단어 집합을 말하며, 예를 들면, "why","she","had","to","go","where","have" 등은 단어이다.

Tokenizer는 document를 단어로 분리한다.
분리하는 기준은 whitespace로 공백, TAB, CR, New Line 등이 해당된다.
* 입력 컬럼은 "sent"로,
* 출력 컬럼은 "words"로 한다.

토큰 -> 최소단위의 상징이다.

이런 역활을 하게 만들어준 라이브러리

텍스트 집합

In [143]:
from pyspark.ml.feature import Tokenizer
#input 방금 만들어놓은 sent
#출력은 words로 지정을한다.
tokenizer = Tokenizer(inputCol="sent", outputCol="words")

In [144]:
#변환하고자하는 myDf에 토크나이저해라.
tokDf = tokenizer.transform(myDf)

In [145]:
tokDf.show(3)

+--------------------+--------------------+
|                sent|               words|
+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|
+--------------------+--------------------+
only showing top 3 rows



```for```문으로 출력해보자. ```Row()``` 객체로 출력된다.

In [146]:
for r in tokDf.select("sent", "words").take(3):
    print (r)

Row(sent='When I find myself in times of trouble', words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble'])
Row(sent='Mother Mary comes to me', words=['mother', 'mary', 'comes', 'to', 'me'])
Row(sent='Speaking words of wisdom, let it be', words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'])


### S.5.5 RegTokenizer

Tokenizer는 white space로 분리하지만, RegexTokenizer는 단어를 분리하기 위해 **정규표현식**을 적용할 수 있다.
정규표현식을 사용하여 분리하거나 특정 패턴을 추출할 수 있다.
공백으로 분리할 경우 간단히 정규표현식 ```\s``` 패턴을 적용할 수 있다.
한글에는 ```\w``` 패턴이 적용되지 않는다.
* ```\s```는 공백문자
* ```\w```는 숫자 및 대소문자 ```[A-Za-z0-9_]```
* 별표 ```*```는 0 또는 그 이상, 더하기 ```+```는 1 또는 그 이상을 의미한다.