In [50]:
import pyspark
myConf=pyspark.SparkConf()
spark = pyspark.sql.SparkSession.builder\
    .master("local")\
    .appName("myApp")\
    .config(conf=myConf)\
    .getOrCreate()

**ETL (Extract, Transform, Load)**은 소스에서 필요한 데이터를 추출, 변환하고, 다른 타켓으로 로딩하는 것으로 말한다.

- **추출**은 원천에서 데이터를 가져오는 것으로, spark에서는 예를 들어 csv, NoSQL 등에서 데이터를 읽어서 RDD, DataFrame을 생성하는 작업을 말한다.
- **변환** 은 분석 가능한 형식으로 변환하는 것으로, 결측 값이나 이상값을 제거하거나 map() 함수를 사용하거나 데이터타입 변환을 하는 등의 작업을 한다.
- **로딩**은 변환한 데이터를 저장하여 놓는 것으로 Spark에서는 RDD, DataFrame의 형식으로 만들어 놓는다. 지도학습 Supervised Learning을 하려면, DataFrame은 label, features 컬럼을, **RDD는 label과 features를 가지고 있는 Labeled Point로 구성** 해야 한다.

-------------------------------------------------
### [Source] -extract-> [transfrom] -load-> [target]

In [9]:
## RDD에서만 읽고 데이터를 변환하고 mlib라이브러리를 사용해서 함
## 데이터 프레임이 나오고 나오서는 ml 라이브러리가 추가가되어서 사용되고 있음
## mlib는 유지보수로 지원됨. 즉, ml라이브러리를 사용하는 것이 좋음

패키지 | 설명 | 데이터 타잎 예
-------|-------|-----
```mllib``` | RDD API | ```pyspark.mllib.linalg.Vector```, ```pyspark.mllib.linalg.Matrix```
```ml``` | DataFrame API | ```pyspark.ml.linalg.Vector```, ```pyspark.ml.linalg.Matrix```

## 1. RDD 변환

기계학습을 하기 위해서는 데이터를 일정 형식으로 만들어 주어야 한다.
**```Vector```**, **```Labeled Point```**, **```Matrix```**를 배워보자.

구분 | 설명
----------|----------
```Vector``` | ```numpy vector```와 같은 기능을 한다. **dense**와 **sparse** vector로 구분한다.
```Labeled Point``` | 분류를 의미하는 클래스 또는 **label**과 속성 **features** 이 묶인 구조로서, 지도학습 supervised learning을 할 경우 사용된다.
```Matrix``` | ```numpy matrix```와 같은 특징을 가진다.

### 1.1 RDD - vectors

행렬 **Vector**는 **dense**와 **sparse**로 구분할 수 있다.

* dense vector는 빈 값이 별로 없이 **모든** 행열이 값을 가지고 있다.
* sparse vector는 빈 값이 많아서, 값이 있는 경우 그 값이 있는 **인덱스**로 표현해 배열을 축약하게 된다.

#### Dense Vectors 밀집벡터

벡터는 일련의 수로 구성이 되고, 행벡터 또는 열벡터가 될 수 있다. 채워지는 값이 대부분 0이면 희소벡터 Sparse Vectors로 만들어 질 수 있다.

In [15]:
# 넘파이 array 를 사용하여 만든 dense vector
import numpy as np
dv = np.array([1.0, 2.1, 3])

In [16]:
# Spark 를 사용하여 만든 dense vector
# Spark에서는 vector 명령을 통해 벡터 만들기 가능 (RDD: mlib모듈, DataFrame: ml모듈 사용)
from pyspark.mllib.linalg import Vectors
dv1mllib=Vectors.dense([1.0, 2.1, 3])

print ("Dense vector: {}\n Type: {}".format(dv1mllib, type(dv1mllib)))

Dense vector: [1.0,2.1,3.0]
 Type: <class 'pyspark.mllib.linalg.DenseVector'>


In [18]:
# pyspark.ml 모듈을 사용하여 Vectors 제작
from pyspark.ml.linalg import Vectors

dv1ml=Vectors.dense([1.0, 2.1, 3])
print ("ml의 dense vector: {}".format(dv1ml))

ml의 dense vector: [1.0,2.1,3.0]


In [20]:
#dense vectors는 numpy array와 같은 특징, 
#인덱스로 값을 읽을 수 있음, 반복문에서 사용할 수 있다.
for e in dv1mllib:
    print (e, end = " ")

1.0 2.1 3.0 

보통 벡터와 같이 **product**, **dot**, **norm**과 같은 벡터 연산을 할 수도 있다.
결과 값은 numpy와 동일하다.

In [22]:
dv1mllib.dot(dv1mllib) #내적

14.41

In [23]:
np.dot(dv,dv)

14.41

In [25]:
#더하기,빼기, 곱하기, 나누기 연산도 가능 dot와 달리 항목별로 실행됨
dv1mllib*dv1mllib

DenseVector([1.0, 4.41, 9.0])

#### Sparse Vectors 희소행렬
###### 0이 있는 것을 지우는 것이 희소행렬임

행렬에는 0 값이 많이 존재하기 때문에, 0값이 아닌 **NZ Nonzero**만 저장하면 훨씬 효율적이다.
**sparse**는 실제 **값이 없는 요소, '0'을 제거**하여 만든 vector이다.
Spark에서 type field (1 바이트 길이)를 통해 식별한다 (0: sparse, 1: dense)

예를 들어, 다음은 1차원 dense vector이다.
```python
[160, 69, 0, 0, 24]
```

In [31]:
sv1 = Vectors.sparse(5,[0,1,4],[160.0,69.0,24.0]) #요소가 다섯개, 0번째, 1번째, 4번째만 값이 있는데, 그것이 뒤에 내용

In [27]:
type(sv1) #ml임을 확인할 수 있음

pyspark.ml.linalg.SparseVector

In [30]:
# Array을 사용하여 sparse에서 dense 벡터로 바꾸기

sv1.toArray()

array([160.,  69.,   0.,   0.,  24.])

### 1.2 Matrix

로컬 Matrix 역시 밀집 dense, 희소 sparse형식을 지원한다.

mlib 사용

```python
from pyspark.mllib.linalg import Matrices 
Matrices.dense(3, 2, [1,2,3,4,5,6])
```

넘파이로 행, 열, 값 나눈후 scipy.sparse 사용
```python
import numpy as np
np.array([]) #numpy로 행,열,값 분류 후 scipy사용하여 sparse matrix 만들기

import scipy.sparse as sps
mtx = sps.csc_matrix((data, (row, col)), shape=(3, 3))
mx.todense() #Dense로 변환
```
<br>

이거 사용하면 **sparse Matrix** 만들기 가능

```python
from pyspark.mllib.linalg import Matrices 
Matrices.dense()
```

#### Dense Matrix

In [33]:
from pyspark.mllib.linalg import Matrices

Matrices.dense(3, 2, [1,2,3,4,5,6]) # 3*2 matrix

DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)

#### Sparse Vectors

```python
[1 0 2]
[0 0 3]
[4 5 6]
```
위와 같은 2차원 dense vectors를 sparse vectors의 배열 방식으로 표현해보자.
우선 다음과 같이
행, 열, 값 vector를 만든다.

```python
행 | 0 | 0 | 1 | 2 | 2 | 2
열 | 0 | 2 | 2 | 0 | 1 | 2
값 | 1 | 2 | 3 | 4 | 5 | 6
```
**행, 열, 데이터를 한 쌍**으로 읽으면 된다.
즉 행 0, 열 0의 위치에 1, 행 0, 열 2의 위치에 2. 이런 식으로 6개의 데이터가 있다.


In [34]:
import numpy as np

row = np.array([0, 0, 1, 2, 2, 2])
col = np.array([0, 2, 2, 0, 1, 2])
data = np.array([1, 2, 3, 4, 5, 6])

In [35]:
import scipy.sparse as sps

mtx = sps.csc_matrix((data, (row, col)), shape=(3, 3))

In [36]:
print (mtx.todense())

[[1 0 2]
 [0 0 3]
 [4 5 6]]


#### Sparse Vectors의 CSR (Compressed Sparse Row) 또는 Yale Format
#### Sparse Vectors 변환시켜주기

다음 5개의 값으로 표현된다.
* 첫째 행 개수 (```int```)
* 둘째 열 개수 (```int```)
* 세째 ```int[]```은 **열 포인터** (IA):
    * IA[0]=**0** 시작 값은 0으로, IA[i]=IA[i-1] + (i-1)열의 **NNZ** (NZ 개수, Num of NonZeroes)
* 네째 ```int[]```은 **행 인덱스** (JA): 각 NZ의 행 인덱스
* 마지막은 소수 (```double```)로 실제 값 리스트

In [38]:
from pyspark.mllib.linalg import Matrices

# 6*4 2차원 배열
dm = Matrices.dense(6,4,[1, 2, 0, 0, 0, 0, 0, 3, 0, 4, 0, 0, 0, 0, 5, 6, 7, 0, 0, 0, 0, 0, 0, 8])

In [39]:
dm.toArray()

array([[1., 0., 0., 0.],
       [2., 3., 0., 0.],
       [0., 0., 5., 0.],
       [0., 4., 6., 0.],
       [0., 0., 7., 0.],
       [0., 0., 0., 8.]])

In [42]:
#희소행렬로 변환한 값
dm.toSparse() 
# 행의 갯수: 6
# 열의 갯수: 4
# 열로 볼때 nonzero값 갯수 [2,2,3,1]  -> 하나씩 더한 값 [0,2,4,7]
# 행으로 볼때 nonzero 값의 갯수 [0,1,1,3,2,3,4,5] 행을 기준으로 따라가야됨

SparseMatrix(6, 4, [0, 2, 4, 7, 8], [0, 1, 1, 3, 2, 3, 4, 5], [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], False)

In [43]:
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
d=sm.toDense()
print(d)

DenseMatrix([[9., 0.],
             [0., 8.],
             [0., 6.]])


### 1.3 분산 Matrix

partition으로 나눠서 Matrix가 존재함.
- spark는 분산에서 사용, pandas는 아님

배열은 n차원을 가질 수 있고, 2차원인 경우에는 매트릭스라고 지칭한다.
매트릭스 역시 **로컬과 분산**으로 구분할 수 있다. <br>
<br>
#### mlib 라이브러리 내에 존재
**로컬 매트릭스**로 ```pyspark.mllib.linalg.Matrix, Matrices```를 사용한다.<br> 
**분산 매트릭스**는 당연히 여러 노드에 분산해서 사용할 수 있고, ```pyspark.mllib.linalg.distributed```에 존재하는 Row Matrix, Indexed Row Matrix, Coordinate Matrix, Block Matrix를 사용하면 된다.

#### Row Matrix

RowMatrix는 ```pyspark.mllib.linalg.distributed```에서 제공되는 분산벡터. <br>
**RDD vectors**로부터 생성된다.<br>
우선 리스트에서 RDD를 생성하고 이를 RowMatrix에 넘겨주자.

In [53]:
p = [[1.0,2.0,3.0],[1.1,2.1,3.1],[1.2,2.2,3.3]]
my=spark.sparkContext.parallelize(p)
my.collect()

[[1.0, 2.0, 3.0], [1.1, 2.1, 3.1], [1.2, 2.2, 3.3]]

RDD Vectors를 넘겨주어야 한다.

In [54]:
from pyspark.mllib.linalg.distributed import RowMatrix

rm=RowMatrix(my)
print (type(rm))
rm.rows.collect()

<class 'pyspark.mllib.linalg.distributed.RowMatrix'>


[DenseVector([1.0, 2.0, 3.0]),
 DenseVector([1.1, 2.1, 3.1]),
 DenseVector([1.2, 2.2, 3.3])]

In [48]:
rm.numRows()

3

In [49]:
rm.numCols()

3

#### Indexed Row Matrix

RDD로 만들어야됨 <br>
IndexedRow(데이터순서,[]) <br>
앞서 Row Matrix과 유사하지만, 파티션으로 나누어, 그러나 순서를 지켜서 저장이 된다.
따라서 시계열 데이터와 같이 순서가 있는 데이터를 저장하기에 적합하다.

순서인덱스와 벡터로 구성한다.

In [75]:
from pyspark.mllib.linalg.distributed import IndexedRow

irRdd = spark.sparkContext.parallelize([
    IndexedRow(1, [3, 1, 2]),
    IndexedRow(2, [1, 3, 2]),
    IndexedRow(3, [5, 4, 3]),
    IndexedRow(4, [6, 7, 4]),
    IndexedRow(5, [8, 9, 2]),
])

In [76]:
irRdd.take(3)

[IndexedRow(1, [3.0,1.0,2.0]),
 IndexedRow(2, [1.0,3.0,2.0]),
 IndexedRow(3, [5.0,4.0,3.0])]

In [77]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix

irm = IndexedRowMatrix(irRdd)

In [78]:
print(irm.numRows()) #0부터 시작한다치고
print(irm.numCols()) 

6
3


### 1.4 Labeled Point

- Labeled point는 로컬벡터로 레이블을 가지고 있는 밀집 또는 희소 행렬을 말한다.
- 레이블이 있으므로, supervised learning에 요구되는 형식이다.
- 레이블은 double형식으로 저장되어야 한다.

#### label, features로 구성

**분류** 및 **회귀분석**에 사용되는 데이터 타잎이다.
**'label'**과 **'features'**로 구성된다.

구분 | 지도학습을 하기 위한 label과 features의 구성
-----|-----
label | supervised learning에서 '구분 값'으로 사용한다. 데이터타입은 'DoubleType'으로 설정되어야 한다.
features | **sparse**, **dense** 모두 사용할 수 있다.

In [80]:
#label 형태
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, [1.0, 2.0, 3.0])

LabeledPoint(1.0, [1.0,2.0,3.0])

In [81]:
#sparse vectors로 feature 구성
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1992, Vectors.sparse(10, {0: 3.0, 1:5.5, 2: 10.0}))

LabeledPoint(1992.0, (10,[0,1,2],[3.0,5.5,10.0]))

서로 다른 패키지의 데이터타잎 **```mllib LabeledPoint```**와 **```ml Vectors```**를 혼용하면, 형변환 오류가 발생한다.
이러한 오류는 패키지를 혼용하지 않으면 된다.

```python
Cannot convert type <class 'pyspark.ml.linalg.DenseVector'> into Vector
```

**```dv1mllib```**은 앞서 **```mllib```**로부터 생성된 dense vector이다.

In [84]:
# All from mlib
from pyspark.mllib.regression import LabeledPoint

LabeledPoint(1.0, dv1mllib)

LabeledPoint(1.0, [1.0,2.1,3.0])

In [86]:
# fromML
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors

LabeledPoint(1.0, Vectors.fromML(dv1ml))

LabeledPoint(1.0, [1.0,2.1,3.0])

# RDD 데이터를 LabeledPoint 로 변환하기
spark에서 제공한 **데이터파일 data/mlib/sample_svm_data.txt**을 읽어서 훈련데이터를 만들어보자

In [100]:
import os
_fsvm = os.path.join("sample_svm_data.txt")

#하나씩 읽어보기
try:
    _f=open(_fsvm,'r')
    _lines=_f.readlines()
    _f.close()
except:
    print("An exception occurred")
    
#파일로부터 데이터를 readlines() 함수로 모두 읽어온다.
#첫행에 label, features 로 구성되어있음
_lines[0]

'1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0\n'

### Spark에서 RDD 생성

Spark는 파일을 Python을 통하지 않고, 직접 읽을 수 있다. <br>
원본 데이터 ```sample_svm_data.txt```는 공백으로 구분되어 있다.<br>
읽을 대상이 파일이므로, RDD를 사용한다. 각 행을 공백으로 분리하여 읽는다.

In [101]:
_rdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])

In [102]:
#첫째 행 읽기
_rdd.take(2)[0]
#_rdd.collect()

[1.0,
 0.0,
 2.52078447201548,
 0.0,
 0.0,
 0.0,
 2.004684436494304,
 2.000347299268466,
 0.0,
 2.228387042742021,
 2.228387042742023,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0]

### LabeledPoint 생성

위 데이터에서 보듯이 첫 열은 **label**로, 그 나머지는 **features**로 생성한다.

In [95]:
from pyspark.mllib.regression import LabeledPoint

_trainRdd0=_rdd.map(lambda line:LabeledPoint(line[0], line[1:]))

In [96]:
_trainRdd0.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

In [None]:
공백을 분리하고, 분리된 데이터를 labeled point로 구성하는 기능을 합쳐서 실행해 본다.

In [97]:
_trainRdd=spark.sparkContext.textFile(_fsvm)\
    .map(lambda line: [float(x) for x in line.split()])\
    .map(lambda p:LabeledPoint(p[0], p[1:]))

In [98]:
_trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

In [104]:
###정리 // split 하고 label 분리
def createLP(line):
    p = [float(x) for x in line.split()]
    return LabeledPoint(p[0], p[1:])


_fp = os.path.join("sample_svm_data.txt")

_rdd=spark.sparkContext.textFile(_fp)
trainRdd = _rdd.map(createLP)

trainRdd.take(1)

[LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])]

### 2. svm의 입력파일 형식

limsvm은 기계학습 모델인 svm을 위한 입력데이터 형식이다.
0은 label, 나머지는 index:value 쌍으로 구성한다.<br>
sparse들을 위함

```python
[label] [index1]:[value1] [index2]:[value2] ...
[label] [index1]:[value1] [index2]:[value2] ...
```

* 예
```python
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 ...
```


In [51]:
#### DataFrame 읽기

In [53]:
import os
fsvm = os.path.join("sample_libsvm_data.txt")
#dfsvm = spark.read.format("libsvm").load(fsvm)
#하나씩 읽어보기

dfsvm = spark.read.format('libsvm').load(fsvm)

trainRdd.take(1)

SyntaxError: invalid syntax (<ipython-input-53-93634b8eb97a>, line 10)

In [None]:
type(dfsvm)

In [None]:
dfsvm.printSchema()

In [15]:
#DataFrame에서는 label, features 컬럼이 별도로 생성되고, features는 sparse vectors 형식으로 구성된다.

In [16]:
dfsvm.take(1)

[Row(value='0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202 215:84 216:252 217:253 218:122 236:163 237:252 238:252 239:252 240:253 241:252 242:252 243:96 244:189 245:253 246:167 263:51 264:238 265:253 266:253 267:190 268:114 269:253 270:228 271:47 272:79 273:255 274:168 290:48 291:238 292:252 293:252 294:179 295:12 296:75 297:121 298:21 301:253 302:243 303:50 317:38 318:165 319:253 320:233 321:208 322:84 329:253 330:252 331:165 344:7 345:178 346:252 347:240 348:71 349:19 350:28 357:253 358:252 359:195 372:57 373:252 374:252 375:63 385:253 386:252 387:195 400:198 401:253 402:190 413:255 414:253 415:196 427:76 428:246 429:252 430:112 441:253 442:252 443:148 455:85 456:252 457:230 458:25 467:7 468:135 469:253 470:186 471:12 483:85 484:252 485:223 494:7 495:131 496:252 497:225 498:71 511:85 512:252 513:145 521:48 522:165 523:252 52

### 3. TF

지금까지는 정량데이터를 다루었지만, **텍스트**를 변환해보자.
##### TF (Term Frequency)
단어빈도를 계산하기 위해 HashingTF를 사용할 수 있다.
단어ID로 Hash 알고리즘에 따라 무작위 번호를 생성하고, 단어빈도를 생성한다.
Hash를 사용하지 않고 계산한 단어빈도는 당연히 동일하다는 것을 알 수 있다.

In [23]:
wikiRdd3 = spark.sparkContext\
    .textFile(os.path.join("s-master/data","ds_spark_wiki.txt"))\
    .map(lambda line: line.split())

In [24]:
from pyspark.mllib.feature import HashingTF

hashingTF = HashingTF()
tf = hashingTF.transform(wikiRdd3)
tf.collect()

# (벡터데이터수, {해싱아이디(컬럼번호): 분포})

[SparseVector(1048576, {1026674: 1.0}),
 SparseVector(1048576, {148618: 1.0, 183975: 1.0, 216207: 1.0, 261052: 1.0, 617454: 1.0, 696349: 1.0, 721336: 1.0, 816618: 1.0, 897662: 1.0}),
 SparseVector(1048576, {60386: 1.0, 177421: 1.0, 568609: 1.0, 569458: 1.0, 847171: 1.0, 850510: 1.0, 1040679: 1.0}),
 SparseVector(1048576, {261052: 4.0, 816618: 4.0}),
 SparseVector(1048576, {60386: 4.0, 594754: 4.0}),
 SparseVector(1048576, {21980: 1.0, 70882: 1.0, 274690: 1.0, 357784: 1.0, 549790: 1.0, 597434: 1.0, 804583: 1.0, 829803: 1.0, 935701: 1.0}),
 SparseVector(1048576, {154253: 1.0, 261052: 1.0, 438276: 1.0, 460085: 1.0, 585459: 1.0, 664288: 1.0, 816618: 1.0, 935701: 2.0, 948143: 1.0, 1017889: 1.0}),
 SparseVector(1048576, {270017: 1.0, 472985: 1.0, 511771: 1.0, 718483: 1.0, 820917: 1.0}),
 SparseVector(1048576, {34116: 1.0, 87407: 1.0, 276491: 1.0, 348943: 1.0, 482882: 1.0, 549350: 1.0, 721336: 1.0, 816618: 1.0, 1025622: 1.0}),
 SparseVector(1048576, {1769: 1.0, 151357: 1.0, 500659: 1.0, 54776

### 4. TF-IDF

IDF는 전체에서 몇 개의 문서에 씌였는지를 반대로 계산한 값이다.
뒤 DataFrame를 사용하여 TF-IDF를 계산하면서 자세히 설명하기로 한다.

In [25]:
from pyspark.mllib.feature import HashingTF, IDF  #mlib니까 RDD임을 알 수 잇음

idf = IDF().fit(tf)
tfidf = idf.transform(tf)
#SparseVector(1048576, {1026674: 1.7047 # 계산된 결과가 들어가서 다름 #빈도가 TF빈도가 아닌 TF-IDF임}),

In [26]:
tfidf.collect()

[SparseVector(1048576, {1026674: 1.7047}),
 SparseVector(1048576, {148618: 1.7047, 183975: 1.7047, 216207: 1.7047, 261052: 1.0116, 617454: 1.7047, 696349: 1.7047, 721336: 1.2993, 816618: 0.7885, 897662: 1.7047}),
 SparseVector(1048576, {60386: 1.2993, 177421: 1.7047, 568609: 1.7047, 569458: 1.7047, 847171: 1.7047, 850510: 1.7047, 1040679: 1.7047}),
 SparseVector(1048576, {261052: 4.0464, 816618: 3.1538}),
 SparseVector(1048576, {60386: 5.1971, 594754: 6.819}),
 SparseVector(1048576, {21980: 1.7047, 70882: 1.7047, 274690: 1.7047, 357784: 1.7047, 549790: 1.7047, 597434: 1.7047, 804583: 1.7047, 829803: 1.7047, 935701: 1.2993}),
 SparseVector(1048576, {154253: 1.7047, 261052: 1.0116, 438276: 1.7047, 460085: 1.7047, 585459: 1.7047, 664288: 1.7047, 816618: 0.7885, 935701: 2.5986, 948143: 1.7047, 1017889: 1.7047}),
 SparseVector(1048576, {270017: 1.7047, 472985: 1.7047, 511771: 1.7047, 718483: 1.7047, 820917: 1.7047}),
 SparseVector(1048576, {34116: 1.7047, 87407: 1.7047, 276491: 1.7047, 3489

### 5. StandardScaler

데이터를 표준화하려면 1) 평균과 표준편차를 계산하고, 2) 측정값에서 평균을 빼고, 표준편차로 나누어 주면 된다.
즉 zscore를 계산하는 것과 같다.

$$ z = \frac {\bar{x_n} - \mu} {\sigma / \sqrt{n}} $$

In [27]:
tRdd = spark.sparkContext\
    .textFile(os.path.join('s-master/data', 'ds_spark_heightweight.txt'))

In [28]:
#정규화 할 값만 추출
## 탭을 분리한다
tRdd.map(lambda x: x.split('\t')).take(1)

[['1', '65.78', '112.99']]

In [29]:
# 형 변환을 해준다
tRdd.map(lambda x: x.split('\t')).map(lambda x: [str(x[0]), float(x[1]), float(x[2])]).take(1)

[['1', 65.78, 112.99]]

In [30]:
tRdd.map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .take(1)

[['1', 65.78, 112.99]]

In [31]:
#2개의 값만 추출하여 dense vectors에 별도로 저장한다.
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: Vectors.dense([x[1], x[2]])) #두개만 가져와서

In [32]:
#리스트로 저장해도 계산에 문제가 없다.
from pyspark.mllib.linalg import Vectors

_tRdd =tRdd\
    .map(lambda x: x.split('\t'))\
    .map(lambda x: [str(x[0]), float(x[1]), float(x[2])])\
    .map(lambda x: [x[1], x[2]])

In [35]:
#### 표준화
from pyspark.mllib.feature import StandardScaler
scaler1 = StandardScaler().fit(_tRdd) #평균,표준편차 넣어서 해도되고 없이 해도되고
scaler2 = StandardScaler(withMean=True, withStd=True).fit(_tRdd)

In [36]:
# zscore와 비교
##fit 다음에는 transform 하게 된다.
### StandardScaler해주면 zscore 값을 계산없이 구할 수 있다.
scaler2.transform(_tRdd).take(5)

[DenseVector([-1.2458, -1.2299]),
 DenseVector([1.9011, 0.5934]),
 DenseVector([0.7388, 1.8767]),
 DenseVector([0.0919, 1.0473]),
 DenseVector([-0.1439, 1.1993])]

## DataFrame 변환

DataFrame으로 만들어진 데이터를 변환해보자. <br>
이러한 작업이 필요한 이유는 **기계학습에 넘겨줄 입력데이터를 형식에 맞추어야** 하기 때문이다.<br>
데이터는 형식에 맞게 변환되고, 군집화, 회귀분석, 분류, 추천 모델 등에 입력으로 사용된다<br>
물론 데이터는 '일련의 수' 또는 '텍스트'로 구성된다.<br>
이런 데이터로부터 특징을 추출하여 **feature vectors**를 구성한다.<br>
지도학습을 하는 경우에는 **class 또는 label** 값이 필요하다.<br>

### S.5.1 Labeled Point를 label, features 컬럼으로 분해

RDD LabeledPoint는 label과 vectors로 구성되어 있다.
따라서 LabeledPoint를 DataFrame으로 읽어오면, 2개의 컬럼으로 생성된다.
이를 label, features 컬럼으로 맞추어 주도록 하자.

#### 레이블이 있는 Python List에서 DataFrame 생성

label과 features를 가지고 가지고 있는 데이터를 생성해보자.

In [37]:
p = [[1, [1.0, 2.0, 3.0]], [1, [1.1, 2.1, 3.1]], [0, [1.2, 2.2, 3.3]]]

In [38]:
print ("label: {}\nfeatures: {}".format(p[0][0], p[0][1]))

label: 1
features: [1.0, 2.0, 3.0]


위 데이터를 읽어서 DataFrame을 생성하면, 두 개의 컬럼으로 구분해서 생성된다. <br>
그러나 컬럼이 자동 명명되기 때문에 ```_1, _2```가 쓰여서 만족스럽지 못하다.

In [39]:
trainDf=spark.createDataFrame(p)
trainDf.collect()

[Row(_1=1, _2=[1.0, 2.0, 3.0]),
 Row(_1=1, _2=[1.1, 2.1, 3.1]),
 Row(_1=0, _2=[1.2, 2.2, 3.3])]

#### LabeledPoint에서 DataFrame 생성

Python List를 LabeledPoint로 만들어 보자.
**LabeledPoint는 RDD에서 사용하는 구조로서 mllib 라이브러리를 사용**해서 만들고 있다.
**DataFrame은 LabeledPoint를 컬럼으로 가지고 있지 않는다**.

In [40]:
from pyspark.mllib.regression import LabeledPoint
p = [LabeledPoint(1, [1.0,2.0,3.0]),
     LabeledPoint(1, [1.1,2.1,3.1]),
     LabeledPoint(0, [1.2,2.2,3.3])]

In [41]:
#dataframe 생성
trainDf=spark.createDataFrame(p)

그러면 LabeledPoint는 분해되어, **label과 features를 별도 컬럼**으로 생성된다. 이런 명칭의 컬럼은 기계학습에 필요하다. **features 데이터를 모델링하여 label에 따라 분류**하게 된다.

In [42]:
trainDf.collect()

[Row(features=DenseVector([1.0, 2.0, 3.0]), label=1.0),
 Row(features=DenseVector([1.1, 2.1, 3.1]), label=1.0),
 Row(features=DenseVector([1.2, 2.2, 3.3]), label=0.0)]

#### mllib.linalg.Vectors를 사용하여 DataFrame을 생성

앞서 mllib와 ml 모듈을 섞어서 사용하지 않아야 한다고 했다.
mllib vectors를 사용해도 DataFrame을 생성하는데 문제가 없다.
컬럼명을 ```["label", "features"]```으로 하지 않으면, 자동명명되니 주의하자.

In [43]:
from pyspark.mllib.linalg import Vectors

trainDf = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, 1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, 0.5]))], ["label", "features"])

In [44]:
trainDf.collect()

[Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])),
 Row(label=0.0, features=DenseVector([2.0, 1.0, 1.0])),
 Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])),
 Row(label=1.0, features=DenseVector([0.0, 1.2, 0.5]))]

#### RDD에서 DataFrame 생성

rdd에서 DataFrame을 생성하면 label, features이 당연히 생성이 되지 않는다.

In [54]:
#from pyspark.mllib.linalg import SparseVector # mllib ok
from pyspark.ml.linalg import SparseVector # ml ok

_rdd = spark.sparkContext.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

In [55]:
_df=_rdd.toDF()
_df.printSchema()

root
 |-- _1: double (nullable = true)
 |-- _2: vector (nullable = true)



In [56]:
_df=_df.withColumnRenamed('_1', 'label').withColumnRenamed('_2', 'features')
_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0| (4,[1,3],[1.0,5.5])|
|  1.0|(4,[0,2],[-1.0,0.5])|
+-----+--------------------+



### S.5.2 단어 빈도

정량 데이터는 합계, 평균, 표준편차 등 의미있는 통계량을 계산하거나, 이런 통계량이 집단 간에 차이가 있는지 분석한다.
반면에 텍스트는 정량데이터와 같이 이러한 통계량의 계산이 가능하지 못하게 된다.
텍스트에 어떤 단어가 얼마나 쓰였는지, 또한 같이 쓰이게 된 단어는 무엇인지 등 단어의 빈도에 따라 **정량화하여 과학적인 분석**을 하게 된다.

#### Bag of Words 모델

#### 텍스트 변환 단계

텍스트를 변환하는 단계를 보자. 순서는 변경될 수 있다.

* 단계 1: 단어로 분할 Tokenization
    * 그, 영화는, 매우, 강렬했다, 그냥, 좋았다, 영화관에서, 보는, 동안, 긴장을, 늦출, 수, 없었다, 갑돌이가, 분장한, 악당의, 케릭터가, 만들어지는, 과정은, 흥미롭지, 않을, 수가, 없었다, 무비의, 이야기, 전개는, 빠르고, 무엇이, 진실이고, 거짓인지, 판단할, 수, 없었다, 누가, 이런, 영화를, 좋아, 하지, 않을, 수가, 있겠는가, 이모티콘

* 단계 2: 정리
    - 불필요, 오타 등

* 단계 3: 불용어 stopwords 제거
    - 그, 수, 수가, 수, 이런, 하지, 수가 등

* 단계 4: 어간 추출 stemming
    영화는, 영화의는 다른 단어지만 조사를 제거하면 동일한 단어
    - 좋았다, 좋아 단어들은 어근을 판별하면 동일한 단어이다.
    - 영화, 무비의 단어는 이음동의
* 단계 5: 계량화
    - word vector로 만든다.
    - 있다-없다, 단어빈도, TF-IDF 사용할 수 있다.<br>dense, sparse 모두 가능하다.
    ```[1,1,1,1,1,0,0],[0,1,0,1,1,1,1]```


In [57]:
# Let it be lyrics
doc=[
    "When I find myself in times of trouble",
    "Mother Mary comes to me",
    "Speaking words of wisdom, let it be",
    "And in my hour of darkness",
    "She is standing right in front of me",
    "Speaking words of wisdom, let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Let it be",
    "Whisper words of wisdom, let it be"
]

문서는 문장으로 구성되어 있고, 문장은 단어로 구성되어 있다. 따라서 첫째 반복문은 문서의 각 문장에 대해, 단어로 분리하고 있다. 그 다음 반복문은 각 단어에 대해 빈도를 계산한다. 각 단어가 키가 되는데, 키가 존재하면 빈도를 증가하고, 존재하지 않으면 새로운 키를 생성한다.

In [60]:
d={}
for sentence in doc:
    words=sentence.split()
    for word in words:
        if word in d:
            d[word]+=1
        else:
            d[word]=1
            
#단어의 빈도를 DICTIONARY d에 저장 키, 빈도의 쌍으로 저장되어있음

In [61]:
d

{'When': 1,
 'I': 1,
 'find': 1,
 'myself': 1,
 'in': 3,
 'times': 1,
 'of': 6,
 'trouble': 1,
 'Mother': 1,
 'Mary': 1,
 'comes': 1,
 'to': 1,
 'me': 2,
 'Speaking': 2,
 'words': 3,
 'wisdom,': 3,
 'let': 3,
 'it': 7,
 'be': 7,
 'And': 1,
 'my': 1,
 'hour': 1,
 'darkness': 1,
 'She': 1,
 'is': 1,
 'standing': 1,
 'right': 1,
 'front': 1,
 'Let': 4,
 'Whisper': 1}

In [66]:
# for k,v in d.iteritems():  # python2
for k,v in d.items():
    print ("{}\t{}".format(k,v))

When	1
I	1
find	1
myself	1
in	3
times	1
of	6
trouble	1
Mother	1
Mary	1
comes	1
to	1
me	2
Speaking	2
words	3
wisdom,	3
let	3
it	7
be	7
And	1
my	1
hour	1
darkness	1
She	1
is	1
standing	1
right	1
front	1
Let	4
Whisper	1


### S.5.3 Spark의 transformer, estimator

**RDD**를 만들고 나서도 데이터를 변환하기 위해 map-reduce와 같은 함수 또는  **transform()**, **fit()**을 사용하는 것과 같이,
**DataFrame**도 역시 **Transformer**, **Estimator**를 사용할 수 있다.
이러한 Spark ml 라이브러리는 Python의 scikit-learn에서 영향을 받아 기계학습 API transformer, estimator, evaluator가 유사하다.
* Estimator는 DataFrame에 적용되는 알고리즘을 말하는 것으로, Transformer를 생성해낸다.<br> **```Estimator.fit()```**
* Transformer는 DataFrame을 적용해서 다른 DataFrame으로 생성한다. <br>**```Transformer.transform()```**
* Evaluator는 ```pyspark.ml.evaluation```의 'BinaryClassificationEvaluator', 'RegressionEvaluator',  'MulticlassClassificationEvaluator', 'MultilabelClassificationEvaluator', 'ClusteringEvaluator', 'RankingEvaluator' 등이 있다.

In [67]:
#2차원 배열
doc2d=[
    ["When I find myself in times of trouble"],
    ["Mother Mary comes to me"],
    ["Speaking words of wisdom, let it be"],
    ["And in my hour of darkness"],
    ["She is standing right in front of me"],
    ["Speaking words of wisdom, let it be"],
    [u"우리 Let it be"],
    [u"나 Let it be"],
    [u"너 Let it be"],
    ["Let it be"],
    ["Whisper words of wisdom, let it be"]
]

In [68]:
myDf=spark.createDataFrame(doc2d, ['sent'])

In [73]:
#truncate 긴 단어들 잘라버림
myDf.show(truncate=True)

+--------------------+
|                sent|
+--------------------+
|When I find mysel...|
|Mother Mary comes...|
|Speaking words of...|
|And in my hour of...|
|She is standing r...|
|Speaking words of...|
|      우리 Let it be|
|        나 Let it be|
|        너 Let it be|
|           Let it be|
|Whisper words of ...|
+--------------------+



### S.5.4 Tokenizer

먼저 용어를 정리해 보자.
* corpus는 어떤 주제에 대해 쓰여지거나, 어떤 사람이 작성한 전체 '말뭉치'를 말한다. 여러 문장으로 구성된 텍스트 집합을 말한다.
* document는 문장으로 구성된 문서를 말하지만, 한 문장으로만 구성될 수도, 여러 문장으로 만들어질 수도 있다. 예를 들어, "why she had to go" 같은 한 문장도 document라고 하고, "why she had to go.. I don't know" 역시 마찬가지이다.
* vocabularay는 중복없는 단어 집합을 말하며, 예를 들면, "why","she","had","to","go","where","have" 등은 단어이다.

Tokenizer는 document를 단어로 분리한다.
분리하는 기준은 whitespace로 공백, TAB, CR, New Line 등이 해당된다.
* 입력 컬럼은 "sent"로,
* 출력 컬럼은 "words"로 한다.

In [70]:
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="sent", outputCol="words")

```transform()```은 앞서 만든 ```tokenizer```모델에 DataFrame을 변환하여 다른 DataFrame을 생성한다.
그 결과는 문자열 배열로 구성된다.

In [71]:
tokDf = tokenizer.transform(myDf)
tokDf.show(3)

+--------------------+--------------------+
|                sent|               words|
+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|
+--------------------+--------------------+
only showing top 3 rows



```for```문으로 출력해보자. ```Row()``` 객체로 출력된다.

In [72]:
for r in tokDf.select("sent", "words").take(3):
    print (r)

Row(sent='When I find myself in times of trouble', words=['when', 'i', 'find', 'myself', 'in', 'times', 'of', 'trouble'])
Row(sent='Mother Mary comes to me', words=['mother', 'mary', 'comes', 'to', 'me'])
Row(sent='Speaking words of wisdom, let it be', words=['speaking', 'words', 'of', 'wisdom,', 'let', 'it', 'be'])


### S.5.5 RegTokenizer

Tokenizer는 white space로 분리하지만, RegexTokenizer는 단어를 분리하기 위해 **정규표현식**을 적용할 수 있다.
정규표현식을 사용하여 분리하거나 특정 패턴을 추출할 수 있다.
공백으로 분리할 경우 간단히 정규표현식 ```\s``` 패턴을 적용할 수 있다.
한글에는 ```\w``` 패턴이 적용되지 않는다.
* ```\s```는 공백문자
* ```\w```는 숫자 및 대소문자 ```[A-Za-z0-9_]```
* 별표 ```*```는 0 또는 그 이상, 더하기 ```+```는 1 또는 그 이상을 의미한다.

In [74]:
from pyspark.ml.feature import RegexTokenizer
re = RegexTokenizer(inputCol="sent", outputCol="wordsReg", pattern="\\s+")

In [76]:
reDf=re.transform(myDf)
reDf.show()

+--------------------+--------------------+
|                sent|            wordsReg|
+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|
|And in my hour of...|[and, in, my, hou...|
|She is standing r...|[she, is, standin...|
|Speaking words of...|[speaking, words,...|
|      우리 Let it be| [우리, let, it, be]|
|        나 Let it be|   [나, let, it, be]|
|        너 Let it be|   [너, let, it, be]|
|           Let it be|       [let, it, be]|
|Whisper words of ...|[whisper, words, ...|
+--------------------+--------------------+



In [78]:
reDf1=re.transform(tokDf)
reDf1.show()
## 정교하게 내가 원하는 데이터를 가지고 할 수 있다는 장점이 있음 : 그러므로 정규 토크나이저를 더 추천

+--------------------+--------------------+--------------------+
|                sent|               words|            wordsReg|
+--------------------+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|[when, i, find, m...|
|Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|And in my hour of...|[and, in, my, hou...|[and, in, my, hou...|
|She is standing r...|[she, is, standin...|[she, is, standin...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|      우리 Let it be| [우리, let, it, be]| [우리, let, it, be]|
|        나 Let it be|   [나, let, it, be]|   [나, let, it, be]|
|        너 Let it be|   [너, let, it, be]|   [너, let, it, be]|
|           Let it be|       [let, it, be]|       [let, it, be]|
|Whisper words of ...|[whisper, words, ...|[whisper, words, ...|
+--------------------+--------------------+--------------------+



### S.5.6 Stopwords

텍스트를 분리하고 나면, 별 의미가 없거나 쓸모가 없는 단어들이 존재한다.
예를 들어 이, 그, 저와 같은 **한 단어** 또는 있다 등과 같은 **일부 동사**, 그래서, 그러나 등과 같은 **접속사** 등이 후보가 될 수 있다.
이런 불필요한 단어들을 불용어 Stopwords라고 하며, 입력데이터에서 제거하도록 한다.
영어의 경우 불용어가 식별되어 제공되고 있다
http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words


In [80]:
from pyspark.ml.feature import StopWordsRemover
stop = StopWordsRemover(inputCol="wordsReg", outputCol="nostops")

In [82]:
stopwords=list()
#stopwords 에 현재 불용어를 담는다
_stopwords=stop.getStopWords()
for e in _stopwords:
    stopwords.append(e)

In [84]:
_mystopwords=[u"나",u"너", u"우리"] #유니코드로 인식하여서 가지고옴
for e in _mystopwords:
    stopwords.append(e)
stop.setStopWords(stopwords)

StopWordsRemover_01cde1971dd1

In [85]:
for e in stop.getStopWords():
    print (e, end="/")

i/me/my/myself/we/our/ours/ourselves/you/your/yours/yourself/yourselves/he/him/his/himself/she/her/hers/herself/it/its/itself/they/them/their/theirs/themselves/what/which/who/whom/this/that/these/those/am/is/are/was/were/be/been/being/have/has/had/having/do/does/did/doing/a/an/the/and/but/if/or/because/as/until/while/of/at/by/for/with/about/against/between/into/through/during/before/after/above/below/to/from/up/down/in/out/on/off/over/under/again/further/then/once/here/there/when/where/why/how/all/any/both/each/few/more/most/other/some/such/no/nor/not/only/own/same/so/than/too/very/s/t/can/will/just/don/should/now/i'll/you'll/he'll/she'll/we'll/they'll/i'd/you'd/he'd/she'd/we'd/they'd/i'm/you're/he's/she's/it's/we're/they're/i've/we've/you've/they've/isn't/aren't/wasn't/weren't/haven't/hasn't/hadn't/don't/doesn't/didn't/won't/wouldn't/shan't/shouldn't/mustn't/can't/couldn't/cannot/could/here's/how's/let's/ought/that's/there's/what's/when's/where's/who's/why's/would/나/너/우리/나/너/우리/

In [86]:
stopDf=stop.transform(reDf)
stopDf.show()

+--------------------+--------------------+--------------------+
|                sent|            wordsReg|             nostops|
+--------------------+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|[find, times, tro...|
|Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|And in my hour of...|[and, in, my, hou...|    [hour, darkness]|
|She is standing r...|[she, is, standin...|[standing, right,...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|
|      우리 Let it be| [우리, let, it, be]|               [let]|
|        나 Let it be|   [나, let, it, be]|               [let]|
|        너 Let it be|   [너, let, it, be]|               [let]|
|           Let it be|       [let, it, be]|               [let]|
|Whisper words of ...|[whisper, words, ...|[whisper, words, ...|
+--------------------+--------------------+--------------------+



### S.5.7 CountVectorizer

```CountVectorizer```는 단어의 빈도 수를 계산한다.

3번째 문장 "Speaking words of wisdom, let it be"의 word vector를 구성해 본다.
id 값은 모든 문장에서 단어를 추출하고 나서야 부여된다.

단어 (3행 "Speaking words of wisdom, let it be") | id | 빈도 | 
-----|-----|-----
Speaking | 7 | 1
words | 13 | 1
of | stopword | 0
wisdom | 12 | 1
let | 3 | 1
it | stopword | 0
be | stopword | 0

위 **word vector**를 표로 나타내면 아래와 같다.
행은 문장, 열은 id이다.
**3행은 doc2**이다. 해당하는 **단어 id의 빈도**를 적었다. 다른 행과 열은 이해를 돕기 위해 비워 놓았다.

```doc``` \ 단어 id  | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |10 |11 |12 |13 |...
------|---|---|---|---|---|---|---|---|---|---|---|---|---|---
```doc 0``` |   |   |   |   |   |   |   |   |   |   |   |   |   |...
```doc 1``` |   |   |   |   |   |   |   |   |   |   |   |   |   |...
```doc 2``` |   |   | 1 |   |   |   | 1 |   |   |   |   | 1 | 1 |...
...   |   |   |   |   |   |   |   |   |   |   |   |   |   |...

#### sklearn CountVectorizer

sklearn 라이브러리로 단어빈도를 세어보자.
입력으로 단어의 집합을 넣어야 하지만, 위 문서는 2차원 리스트이다.
먼저 2차원 리스트를 1차원으로 변경하자.

In [109]:
from functools import reduce #2차원을 1차원으로 만들어줘야함
doc = reduce(lambda x,y: x+y, doc2d)

In [89]:
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(stop_words='english')

In [90]:
print (vectorizer.fit_transform(doc))

  (0, 9)	1
  (0, 10)	1
  (1, 5)	1
  (1, 4)	1
  (1, 0)	1
  (2, 7)	1
  (2, 13)	1
  (2, 12)	1
  (2, 3)	1
  (3, 2)	1
  (3, 1)	1
  (4, 8)	1
  (4, 6)	1
  (5, 7)	1
  (5, 13)	1
  (5, 12)	1
  (5, 3)	1
  (6, 3)	1
  (6, 14)	1
  (7, 3)	1
  (8, 3)	1
  (9, 3)	1
  (10, 13)	1
  (10, 12)	1
  (10, 3)	1
  (10, 11)	1


In [91]:
vectorizer.vocabulary_

{'times': 9,
 'trouble': 10,
 'mother': 5,
 'mary': 4,
 'comes': 0,
 'speaking': 7,
 'words': 13,
 'wisdom': 12,
 'let': 3,
 'hour': 2,
 'darkness': 1,
 'standing': 8,
 'right': 6,
 '우리': 14,
 'whisper': 11}

In [92]:
vectorizer.fit_transform(doc).todense()

matrix([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
        [1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0],
        [0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0]], dtype=int64)

#### Spark CountVectorizer

CountVectorizer는 단어를 분리하고 나서, 빈도를 계산할 수 있다. 결과는 단어빈도 word vector (sparse), 즉 단어별 단어빈도 TF이다.
자주 사용된 단어가 아니어서 제거해야할 단어, 즉 문서에 사용된 빈도 document frequency를 minDF, maxDF를 통해 설정할 수 있다.
소수점을 사용하면, 비율, 즉 '사용된 문서 수/전체 문서 수'를 의미한다.

* minDf는 너무 **적게 발생하는 경우 무시**, 예를 들어 0.5는 전체 문서의 **50%보다 적게** 발생하는 단어는 무시, 1.0은 기본 값이고, **100%보다 적게** 발생하는 경우 무시하게 된다. 즉, minDf=1.0은 문서 1개 이하에서 나타난 단어는 무시하라는 의미이다. 즉 어떤 단어도 무시하지 말라는 의미이다.
* maxDf는 너무 **많이 발생하는 경우 무시**, 예를 들어 0.5는 전체 문서의 **50%보다 많이** 발생하는 경우 무시, 1.0은 **100%보다 많이** 발생하는 경우 무시 (즉, 어떤 단어도 무시하지 말라는 의미). min_df와 마찬가지로 1.0이 기본 값이다.

In [110]:
#from sklearn.feature_extraction.text import CountVectorizer
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="nostops", outputCol="cv", vocabSize=30, minDF=1.0)

```CountVectorizerModel```은 ```fit()```하고 나면 얻어진다. 다음에 사용하는 ```HashingTF```는 ```fit()```하지 않는다는 점에서 차이가 있다.

In [94]:
cvModel = cv.fit(stopDf)

In [95]:
print (type(cv),type(cvModel))

<class 'pyspark.ml.feature.CountVectorizer'> <class 'pyspark.ml.feature.CountVectorizerModel'>


In [97]:
cvDf = cvModel.transform(stopDf)
cvDf.show(3)

+--------------------+--------------------+--------------------+--------------------+
|                sent|            wordsReg|             nostops|                  cv|
+--------------------+--------------------+--------------------+--------------------+
|When I find mysel...|[when, i, find, m...|[find, times, tro...|(16,[5,6,8],[1.0,...|
|Mother Mary comes...|[mother, mary, co...|[mother, mary, co...|(16,[10,13,14],[1...|
|Speaking words of...|[speaking, words,...|[speaking, words,...|(16,[0,1,2,3],[1....|
+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows



DataFrame 전체를 출력하면 보기 불편하므로, 이 가운데 일부 컬럼만을 선택하여 출력할 수 있다.
```(16,[5,6,8],[1.0,1.0,1.0])```
16은 전체 단어의 개수, 그리고 다음 5,6,8은 값이 있는 컬럼 번호, 1.0,1.0,1.0은 그 값을 말한다.

In [98]:
cvDf.select('sent','nostops','cv').show()

+--------------------+--------------------+--------------------+
|                sent|             nostops|                  cv|
+--------------------+--------------------+--------------------+
|When I find mysel...|[find, times, tro...|(16,[5,6,8],[1.0,...|
|Mother Mary comes...|[mother, mary, co...|(16,[10,13,14],[1...|
|Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....|
|And in my hour of...|    [hour, darkness]|(16,[7,9],[1.0,1.0])|
|She is standing r...|[standing, right,...|(16,[4,12,15],[1....|
|Speaking words of...|[speaking, words,...|(16,[0,1,2,3],[1....|
|      우리 Let it be|               [let]|      (16,[0],[1.0])|
|        나 Let it be|               [let]|      (16,[0],[1.0])|
|        너 Let it be|               [let]|      (16,[0],[1.0])|
|           Let it be|               [let]|      (16,[0],[1.0])|
|Whisper words of ...|[whisper, words, ...|(16,[0,1,2,11],[1...|
+--------------------+--------------------+--------------------+



```CountVectorizer```에서 사용된 단어 목록을 출력할 수 있다. 아래 단어의 수를 세어보면 위 sparse vector의 컬럼 개수와 동일하다.

In [99]:
cvModel.vocabulary

['let',
 'wisdom,',
 'words',
 'speaking',
 'right',
 'trouble',
 'find',
 'hour',
 'times',
 'darkness',
 'mother',
 'whisper',
 'front',
 'mary',
 'comes',
 'standing']

### S.5.8 TF-IDF

```TfidfTransformer```는 **TF-IDF(Term Frequency-Inverse Document Frequency)**를 계산한다.
이를 위해서는 우선 Tokenizer를 사용하여 문장을 단어로 분리해 놓아야 한다.

HashingTF를 사용하여 'word vector'를 계산한다.
HashingTF은 hash함수에 따라 단어의 고유번호를 생성하며, 단어 수가 많아지면서 고유번호가 충돌할 수 있는 가능성이 있지만 이를 최소화게 된다.
그리고 IDF를 계산하고, TF-IDF를 계산한다.

#### S.5.8.1 TF-IDF 계산

'Let it be'가사 세 번째 줄 **'wisdom' 단어**의 TF-IDF를 계산해보자.
**TF**는 단어빈도수, 즉 문서에 단어가 나타난 빈도수를 의미한다.
단어빈도는 경우에 따라서는 문제가 될 수 있다. 예를 들어, 'a', 'the', 'of'와 같은 단어는 빈도는 높지만 별로 유용하지 못하다.
이 경우 IDF는 유용하다. **IDF**는 자주 나타나는 단어에 대한 가중치를 줄이고, 드물게 나타나는 단어에 가중치를 높이는 방식으로 계산된다.

t는 단어, 문서는 d, D는 corpus,

항목 | 설명 | 예제
-----|-----|-----
tf(d,f) | 단어 t가 문서 d에서 나타나는 단어의 빈도 수, term frequency | $f_{t,d}$ / (number of words in d) = 1/4 = 0.25<br>(3번째 문서에 stopwords를 제외하면 4개의 단어, wisdom은 1회 나타난다.)
df | document frequency 단어가 나타난 문서 수 | 3 (wisdom이 포함된 문서는 3)
N | number of documents 전체 문서의 수 | 11 (전체의 문서는 11개)
idf | inverse document frequency 단어가 나타난 문서의 비율을 거꾸로 | ln(N+1 / df+1) + 1 = log(12/4) + 1 = 1.09861 + 1<br>0으로 나뉘는 것을 방지하기 위해 **smoothing**, 즉 1을 더한다. 

프로그래밍에서는 메모리를 적게 사용하도록 설계되어 있다. ```1/4```와 같이 정수 타잎으로 연산하면, 정수를 사용하여 연산하고 그 결과도 정수를 출력하게 된다. 이를 변환하기 위해 ```1.```의 경우에서와 같이 소수를 사용하자.

In [100]:
import math

tf=1./4
df=3.
N=11.
idf=math.log((N+1)/(df+1))+1
print ("idf: {}".format(idf))

idf: 2.09861228866811


#### S.5.8.2 sklearn을 사용한 TF-IDF

우선 'sklearn'의 TF-IDF를 계산해보자.
**```CountVectorizer```**는 텍스트를 단어의 빈도로 변환해주어, 문서 x 단어 표를 출력할 수 있다.
**```CountVectorizer()```**의 인자로
analyzer ("word", "character ngram" 등 선택),
tokenizer (단어의 tokenizer를 지정),
stop_words (불용어 처리 기준),
max_features (최대 속성 개수) 등을 지정할 수 있다.
그 다음으로, TF-IDF를 계산할 수 있다. 이 때 (문서id, 단어id) 별로 결과가 출력된다.

```TfidfVectorizer```를 사용해서 계산하면 그 결과를 아래와 같이 볼 수 있다.
```python
(2,12) 2.09861228867
```

결과에서
**'2'**는 3번째 문서번호, **'12'**는 'wisdom' 단어번호
TF-IDF는 ```2.09861228867```이다.

max_df, min_df는 기본 값이 1.0으로 굳이 설정하지 않아도 되는데, 어떤 단어도 무시하지 말라는 의미이다.

In [102]:
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(max_df=1.0, stop_words='english',norm = None)
print (vectorizer.fit_transform(doc))

  (0, 10)	2.791759469228055
  (0, 9)	2.791759469228055
  (1, 0)	2.791759469228055
  (1, 4)	2.791759469228055
  (1, 5)	2.791759469228055
  (2, 3)	1.4054651081081644
  (2, 12)	2.09861228866811
  (2, 13)	2.09861228866811
  (2, 7)	2.386294361119891
  (3, 1)	2.791759469228055
  (3, 2)	2.791759469228055
  (4, 6)	2.791759469228055
  (4, 8)	2.791759469228055
  (5, 3)	1.4054651081081644
  (5, 12)	2.09861228866811
  (5, 13)	2.09861228866811
  (5, 7)	2.386294361119891
  (6, 14)	2.791759469228055
  (6, 3)	1.4054651081081644
  (7, 3)	1.4054651081081644
  (8, 3)	1.4054651081081644
  (9, 3)	1.4054651081081644
  (10, 11)	2.791759469228055
  (10, 3)	1.4054651081081644
  (10, 12)	2.09861228866811
  (10, 13)	2.09861228866811


In [103]:
vectorizer.vocabulary_

{'times': 9,
 'trouble': 10,
 'mother': 5,
 'mary': 4,
 'comes': 0,
 'speaking': 7,
 'words': 13,
 'wisdom': 12,
 'let': 3,
 'hour': 2,
 'darkness': 1,
 'standing': 8,
 'right': 6,
 '우리': 14,
 'whisper': 11}

In [104]:
vectorizer.idf_

array([2.79175947, 2.79175947, 2.79175947, 1.40546511, 2.79175947,
       2.79175947, 2.79175947, 2.38629436, 2.79175947, 2.79175947,
       2.79175947, 2.79175947, 2.09861229, 2.09861229, 2.79175947])

#### S.5.8.3 Spark를 사용한 TF-IDF

##### TF
HashingTF는 단어집합을 워드벡터 word vector로 변환하는데, 해시함수를 사용해서 단어에 해당하는 일련번호를 결정한다.
HashingTF에서의 **```numFeatures```는 $2^n$**으로 결정하게 된다.
단어 갯수가 900이면, $2^{10}=1024$이므로 1024로 설정하면 된다.
기본은 $2^{18}=262,144$이다.
너무 적게 설정되면 인덱스가 부족하거나 적절하게 매핑될 수 있으니 주의해야 한다.


예를 들어, 
문서 ```[speaking, words, wisdom,, let]```의 경우 ```(32,[4,24,27],[1.0,1.0,2.0])```가 출력된다.
아래의 결과와 비교하면 단어 하나가 유실된 것을 알 수 있다.

In [105]:
from pyspark.ml.feature import HashingTF, IDF

# hashTF = HashingTF(inputCol="nostops", outputCol="hash", numFeatures=32)#numfeatrures의 기본 값은 262,144 너무 크므로 줄일라면
#  mapping indices insufficient
hashTF = HashingTF(inputCol="nostops", outputCol="hash")

```HashingTF```는 ```fit()```하지 않고 ```transform()``` 한다.

In [106]:
hashDf = hashTF.transform(stopDf)

```(50,[10,24,43],[1.0,1.0,1.0])```
16은 해시 개수 (앞서 CountVectorizer의 경우에서와 같이 전체 단어의 개수가 아니다), 그리고 다음 [10,24,43]은 값이 있는 **해시 컬럼** 번호, 1.0,1.0,1.0은 그 값을 말한다.

In [107]:
hashDf.select("nostops", "hash").show(truncate=False)

+-------------------------------+--------------------------------------------------------+
|nostops                        |hash                                                    |
+-------------------------------+--------------------------------------------------------+
|[find, times, trouble]         |(262144,[64317,91878,152481],[1.0,1.0,1.0])             |
|[mother, mary, comes]          |(262144,[24657,63767,245426],[1.0,1.0,1.0])             |
|[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) |
|[hour, darkness]               |(262144,[74517,98431],[1.0,1.0])                        |
|[standing, right, front]       |(262144,[84798,218360,229166],[1.0,1.0,1.0])            |
|[speaking, words, wisdom,, let]|(262144,[27556,151864,173339,175131],[1.0,1.0,1.0,1.0]) |
|[let]                          |(262144,[173339],[1.0])                                 |
|[let]                          |(262144,[173339],[1.0])                                 |

##### TF-IDF

In [108]:
idf = IDF(inputCol="hash", outputCol="idf")
idfModel = idf.fit(hashDf)
idfDf = idfModel.transform(hashDf)
for e in idfDf.select("nostops","hash").take(10):
    print(e)

Row(nostops=['find', 'times', 'trouble'], hash=SparseVector(262144, {64317: 1.0, 91878: 1.0, 152481: 1.0}))
Row(nostops=['mother', 'mary', 'comes'], hash=SparseVector(262144, {24657: 1.0, 63767: 1.0, 245426: 1.0}))
Row(nostops=['speaking', 'words', 'wisdom,', 'let'], hash=SparseVector(262144, {27556: 1.0, 151864: 1.0, 173339: 1.0, 175131: 1.0}))
Row(nostops=['hour', 'darkness'], hash=SparseVector(262144, {74517: 1.0, 98431: 1.0}))
Row(nostops=['standing', 'right', 'front'], hash=SparseVector(262144, {84798: 1.0, 218360: 1.0, 229166: 1.0}))
Row(nostops=['speaking', 'words', 'wisdom,', 'let'], hash=SparseVector(262144, {27556: 1.0, 151864: 1.0, 173339: 1.0, 175131: 1.0}))
Row(nostops=['let'], hash=SparseVector(262144, {173339: 1.0}))
Row(nostops=['let'], hash=SparseVector(262144, {173339: 1.0}))
Row(nostops=['let'], hash=SparseVector(262144, {173339: 1.0}))
Row(nostops=['let'], hash=SparseVector(262144, {173339: 1.0}))


## Word2Vec
- Bags of Words 모델은 단어 순서와 문맥을 무시한다.
- Word2Vec은 이런 BoW 모델의 단점을 극복하기 위해서 말뭉치로부터 **단어들 서로의 맥락 또는 연관성 Word Embedding**을 신경망으로 학습하여 Word2Vec을 계산
- 단어를 벡터로 변환하게 되면 벡터 연산이 가능해지고 서로 간의 거리를 측정하여, 가까울 수록 비슷한 단어를 해석 가능

In [None]:
from pyspark.ml.feature import Word2Vec

#최소 갯수 0
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="w2v")
# token으로 분리하여
model = word2Vec.fit(tokDf)
w2vDf = model.transform(tokDf)

In [None]:
for e in w2vDf.select("w2v").take(3):
    print(e)

In [None]:
model.getVectors().show(truncate=False)

In [None]:
#"times"와 유사한 단어 3개 찾아서 show
model.findSynonyms("times", 3).show()

###  NGram

텍스트를 대상으로 하면, n-gram은 연속된 n개의 토큰으로 구성된 순열을 말한다.
unigram은 한 단어로, bigram은 두 단어로 구성한다.

In [1]:
from pyspark.ml.feature import NGram
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
#tokenizer 되어있는걸 input으로 넣음
ngramDf = ngram.transform(tokDf) 
ngramDf.show()
for e in ngramDf.select("words","ngrams").take(3):
    print e
    
# bigram으로 넣는다

SyntaxError: Missing parentheses in call to 'print'. Did you mean print(e)? (<ipython-input-1-c47b3abea57e>, line 7)

### StringIndexer

문자열 컬럼을 인덱스 컬럼으로 변환한다. **빈도가 제일 높은 순서**로 ```0.0```부터 인덱스 값이 주어진다. 인덱스는 double 형을 가지게 된다.
없는 레이블에 대해서는 예외가 발생할 수 있으므로 (default), ```setHandleInvalid("skip")``` 함수로 'skip', 'keep', 'error' 등 처리

구분 | 설명 | 예
-----|-----|-----
nominal | 명목 또는 구분 값 cateogry  | 사자, 호랑이, 사람
ordinal | 명목값과 다른 점은 순서가 있다. | 키 low, med, high
interval | 일정한 간격이 있다. | 150-165, 165-180, 180-195

In [None]:
# 현재 텍스트에 대해서는 적당한 명목 변수가 없으므로, 문장 전체에 대해 인덱스로 반환
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer(inputCol="sent", outputCol="sentLabel")
model=labelIndexer.fit(myDf)
siDf=model.transform(myDf)
siDf.show()

### 연속데이터의 변환

몸무게(inches), 키(pounds) 데이터를 분석해보자.
이 데이터는 정량, 연속 데이터이다. 
출처는 https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv

```python
1	65.78	112.99
2	71.52	136.49
3	69.40	153.03
4	68.22	142.34
5	67.79	144.30
6	68.70	123.30
7	69.80	141.49
8	70.01	136.46
9	67.90	112.37
10	66.78	120.67
11	66.49	127.45
12	67.62	114.14
13	68.30	125.61
14	67.12	122.46
15	68.28	116.09
16	71.09	140.00
17	66.46	129.50
18	68.65	142.97
19	71.23	137.90
20	67.13	124.04
21	67.83	141.28
22	68.88	143.54
23	63.48	97.90
24	68.42	129.50
25	67.63	141.85
26	67.21	129.72
27	70.84	142.42
28	67.49	131.55
29	66.53	108.33
30	65.44	113.89
31	69.52	103.30
32	65.81	120.75
33	67.82	125.79
34	70.60	136.22
35	71.80	140.10
36	69.21	128.75
37	66.80	141.80
38	67.66	121.23
39	67.81	131.35
40	64.05	106.71
41	68.57	124.36
42	65.18	124.86
43	69.66	139.67
44	67.97	137.37
45	65.98	106.45
46	68.67	128.76
47	66.88	145.68
48	67.70	116.82
49	69.82	143.62
50	69.09	134.93
```

In [None]:
from pyspark.sql.types import *
rdd=spark.sparkContext\
    .textFile(os.path.join('data','ds_spark_heightweight.txt'))

myRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])
myDf=spark.createDataFrame(myRdd,["id","weight","height"])

In [None]:
myDf.printSchema()

In [None]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=68.0, inputCol="weight", outputCol="weight2")
binDf = binarizer.transform(myDf)
binDf.show(10)

In [None]:
from pyspark.ml.feature import QuantileDiscretizer

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="height", outputCol="height3")
qdDf = discretizer.fit(binDf).transform(binDf)
qdDf.show(10)

### VectorAssembler

* 열을 묶어서 Vector열로 만든다.
* string은 묶을 수 없다.
* pyspark.ml.linalg.Vectors를 사용한다. (주의: pyspark.mllib.linalg.Vectors를 사용하지 않는다.)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

va = VectorAssembler(inputCols=["weight2","height3"],outputCol="features")
vaDf = va.transform(qdDf)
vaDf.printSchema()
vaDf.show(5)