# 스트리밍 

## word count 예제 
9999 소켓을 통해 들어오는 메시지를 워드카운팅하는 코드입니다. 

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

스트리밍 처리를 위한 쓰레드 두개를 생성합니다 

In [None]:
sc = SparkContext("local[2]", "NetworkWordCount")

배치 간격을 1초로 맞추어 스트리밍 컨텍스트 생성 

In [None]:
ssc = StreamingContext(sc, 1)

스트리밍 데이터를 받을 소켓을 엽니다.

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

띄워쓰기를 기준으로 텍스트 분할

words = lines.flatMap(lambda line: line.split(" "))

각 배치별 워드카운팅을 진행할 코드

In [None]:
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

In [None]:
wordCounts.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

## stateful streaming 

기존 코드는 각 배치마다 따로 결과를 냈지만 이번에는 누산을 해보도록 하겠습니다. 

In [3]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
sc = SparkContext("local[2]", "StatefulNetworkWordCount")

In [None]:
ssc = StreamingContext(sc, 1)

장애 시, 복구시점 

In [None]:
ssc.checkpoint("checkpoint")

updatebykey의 타겟이 되는 함수 
기존의 것과 새로운 것이 합쳐집니다. 

In [None]:
def updateFunc(new_values, last_sum):
 	return sum(new_values) + (last_sum or 0)

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

In [None]:
running_counts = lines.flatMap(lambda line: line.split(" "))\
					.map(lambda word: (word, 1))\
                    .updateStateByKey(updateFunc)

In [None]:
running_counts.pprint()

In [None]:
ssc.start()

In [None]:
ssc.awaitTermination()

## Structed streaming

spark2.0에서는 dataframe을 이용한 스트리밍 처리를 지원합니다. 

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

이번에는 스트리밍 컨텍스트를 만들지 않습니다. 그 이유는 이미 스팤 세션에 들어있기 때문이죠 

In [1]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.driver.memory", "16g")

NameError: name 'SparkSession' is not defined

In [None]:
lines = spark\
	.readStream\
	.format('socket')\
	.option('host', 'localhost')\
	.option('port', 9999)\
	.load()

explode : arrray의 각 구성요소를 쪼개서 행으로 만듬(groupby를 가능하게 만듬)<br>
spllit : string을 조건에 맞게 쪼개서 array로 만듬

RDD에서의 lines.flatMap(lambda line: line.split(" "))와 동일한 작업

In [None]:
words = lines.select(
	explode(
    	split(lines.value, ' ')
	).alias('word')
)

word컬럼을 같은 것끼리 묶은 후 카운팅을 합니다. 

In [None]:
# Generate running word count
wordCounts = words.groupBy('word').count()

In [None]:
query = wordCounts\
	.writeStream\
	.outputMode('complete')\
	.format('console')\
	.start()

In [None]:
query.awaitTermination()

## 미션 : 카프키를 이용해서 스트림 처리하기

sparkin 토픽으로 스트림을 받아와서  <br>
sparkout 토픽으로 처리 결과를 내보내는 코드 만들기


### 카프카로 읽어오는 방법

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

#### 멀티토픽도 가능합니다. 

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()

#### 토픽목록에 패턴도 걸 수 있습니다. 

In [None]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()

#### 오프셋 지정도 가능합니다 

In [None]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

### 카프카로 싱크하는 법

In [None]:
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

#### 네 멀티토픽 가능하구요

In [None]:
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1,topic2") \
  .start()

# MLlib

MLlib는 머리 가슴 배로 나뉘어 집니다 <br>
- 머리 : 전처리 - 우리가 배웠던 na처리 아웃라이어 처리 등을 하면서 데이터 클렌징/클리닝을 하고 ml 모델이 처리하기 쉬운형태로 transformation
- 가슴 : ML알고리즘 - ML 모델을 이용해서 classfication, clustering, regression 등으로 문제를 해결합니다 
- 배 : 유틸리티 - 기술통계, 카이 스퀘어 테스트(정확도 테스트) 시각화 등등 

## 사례 1 : 유아 생존율을 예측해봅니다. 

데이터 다운로드

In [None]:
! wget www.tomdrabas.com/data/LearningPySpark/births_train.csv.gz

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

In [5]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.driver.memory", "16g")

In [6]:
import pyspark.sql.types as typ

## 데이터 스키마 지정
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])

In [7]:
spark = SparkSession \
    .builder \
    .appName("sparkML") \
    .getOrCreate()

spark.conf.set("spark.executor.memory", "16g")
spark.conf.set("spark.driver.memory", "16g")

In [8]:
births = spark.read.csv('births_train.csv.gz', 
                        header=True, 
                        schema=schema)

레코드 딕셔너리 명시
Y -> 1 , N -> 0 , U -> 0으로 바꿀 예정

In [9]:
recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

### 첫번째 목표는 INFANT_ALIVE_AT_REPORT가 1인지 0인지 예측할겁니다. 
#### 이것과 관련이 있는 피처들은 빠지고 오로지 어머니 아버지 출생지와 관련된 피처만 가지고 예측하고자 합니다 

In [10]:
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

#### 레코드 함수 정의 
- 아까 만들어뒀던 레코드 딕셔너리가 여기 쓰입니다. 딕셔너리에 따라 변환 
- correct_cig는 99가 아닐때는 자기 값을 그대로 들고가고 99일때만 0으로 바뀝니다. 
- 이걸 데이터프레임에 직접쓰면 좋겠지만 스파크가 이 명령을 알아먹을 수 있는 UDF형태로 변환이 필요합니다. 그래서 UDF 변환이 가능한 rec_integer함수를 만들었습니다(레코드 함수 전달, 타입 명시 -> 여기선 정수)

In [11]:
import pyspark.sql.functions as func

def recode(col, key):        
    return recode_dictionary[key][col] 

def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat))\
        .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())


자 이제 레코드 변환 작업을 시작할겁니다. 흡연량에 관련된 피처부터 고쳐봅니다. 
withColumn은 변환할 컬럼을 첫번째 param으로 변환된 컬럼을 두번째 param으로 지정합니다. (컬럼명을 바꾸지 않앗습니다.)

In [12]:
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

자 이제 Y N U를 변형해줍니다. 그럴려면 일단 Y N U가 있는 컬럼을 알아내야 합니다 

In [13]:


cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) \
            .distinct() \
            .rdd \
            .map(lambda row: row[0]) \
            .collect()

        print(dis)
        if 'Y' in dis:
            YNU_cols.append(s[0])

YNU_cols

['Y', 'N']
['7', '3', '5', '6', '9', '1', '4', '2']
['Y', 'U', 'N']
['Y', 'U', 'N']
['Y', 'U', 'N']
['Y', 'U', 'N']
['Y', 'U', 'N']


['INFANT_ALIVE_AT_REPORT',
 'DIABETES_PRE',
 'DIABETES_GEST',
 'HYP_TENS_PRE',
 'HYP_TENS_GEST',
 'PREV_BIRTH_PRETERM']

데이터프레임은 피처를 고르면서 동시에 피처를 변형할 수 있습니다 

In [14]:
births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]

모든 YNU cols를 한방에 처리하기 위해 이러한 transformation list를 민듭니다. 

In [15]:
exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)

샘플로 뽑아서 제대로 바뀐지 확인합니다. 

In [16]:
births_transformed.select(YNU_cols[-5:]).show(5)

+------------+-------------+------------+-------------+------------------+
|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+------------+-------------+------------+-------------+------------------+
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 1|
|           0|            0|           0|            0|                 0|
+------------+-------------+------------+-------------+------------------+
only showing top 5 rows



### 데이터에 대해 알아보기 
- 통계적 모델을 잘 알려진 방법으로 만들기 위해 데이터를 잘 파악 
- 데이터를 파악하지 않고도 훌륭한 모델을 만들 수 있으나 공수가 많이 필요함 
- 80%의 데이터 전처리와 15%의 데이터 파악 (모델은 5%)

#### 기술통계 
우리는 MLlib 사용이 목적이므로 모든 데이터를 가지고 통계를 내는 describe를 사용하는 것 보다 샘플을 이용해서 통계를 내는 colStats 함수를 사용해보겠습니다. (속도는 빠르나 표본집단의 신뢰도 문제 그래서 모집단이 많아야 함)

##### colstats 함수 
MultivariateStatisticalSummary 객체 리턴  아래 이 객체가 가지고 있는 것들
- count() row갯수
- max() 한 column 최댓값
- mean() 한 column 평균
- min() 한 column 최솟값
- normL1() 한 column의 L1 norm  절대값의 합
- normL2() 한 column의 L2 norm  유클리드 백터 크기  루트(제곱수의 합)
- numNonzeros() 한 column에서의 0이 아닌 갯수
- variance() 한 column의 분산

In [17]:
import pyspark.mllib.stat as st
import numpy as np

numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]

numeric_rdd = births_transformed\
                       .select(numeric_cols)\
                       .rdd \
                       .map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

for col, m, v in zip(numeric_cols, 
                     mllib_stats.mean(), 
                     mllib_stats.variance()):
    print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))

MOTHER_AGE_YEARS: 	28.30 	 6.08
FATHER_COMBINED_AGE: 	44.55 	 27.55
CIG_BEFORE: 	1.43 	 5.18
CIG_1_TRI: 	0.91 	 3.83
CIG_2_TRI: 	0.70 	 3.31
CIG_3_TRI: 	0.58 	 3.11
MOTHER_HEIGHT_IN: 	65.12 	 6.45
MOTHER_PRE_WEIGHT: 	214.50 	 210.21
MOTHER_DELIVERY_WEIGHT: 	223.63 	 180.01
MOTHER_WEIGHT_GAIN: 	30.74 	 26.23


#### 데이터를 통해 알 수 있는 것
1. 아버지의 비해 어머니 나이가 적다
2. 어머니의 평균 나이는 28이고 아버지는 44이다
3. 몇몇은 임신중에 금연을 하였고, 몇몇은 흡연중



카테고리 값(이산 값)의 빈도수를 계산해보겠습니다.

In [18]:
categorical_cols = [e for e in births_transformed.columns 
                    if e not in numeric_cols]

categorical_rdd = births_transformed\
                       .select(categorical_cols)\
                       .rdd \
                       .map(lambda row: [e for e in row])
            
for i, col in enumerate(categorical_cols):
    agg = categorical_rdd \
        .groupBy(lambda row: row[i]) \
        .map(lambda row: (row[0], len(row[1])))
        
    print(col, sorted(agg.collect(), 
                      key=lambda el: el[1], 
                      reverse=True))


INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [(0, 44881), (1, 548)]
DIABETES_GEST [(0, 43451), (1, 1978)]
HYP_TENS_PRE [(0, 44348), (1, 1081)]
HYP_TENS_GEST [(0, 43302), (1, 2127)]
PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]


#### 상관계수 파악
0.5 이상만 출력 

In [19]:
corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
    correlated = [
        (numeric_cols[j], corrs[i][j]) 
        for j, e in enumerate(el) 
        if e == 1.0 and j != i]
    
    if len(correlated) > 0:
        for e in correlated:
            print('{0}-to-{1}: {2:.2f}' \
                  .format(numeric_cols[i], e[0], e[1]))

CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60


#### 상관계수 분석
- CIG관련 상관계수가 매우 크므로 CIG_TRI1만 사용합니다 
- 몸무게관련도 상관관게가 크므로 MOTHER_PRE_WEIGHT만 사용합니다 

In [20]:
features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])


### 통계값 테스트 
카테고리 피처는 상관계수를 구할 수 없으므로 카이-스퀘어 테스트를 합니다 

In [21]:
import pyspark.mllib.linalg as ln

for cat in categorical_cols[1:]:
    agg = births_transformed \
        .groupby('INFANT_ALIVE_AT_REPORT') \
        .pivot(cat) \
        .count()    

    agg_rdd = agg \
        .rdd\
        .map(lambda row: (row[1:])) \
        .flatMap(lambda row: 
                 [0 if e == None else e for e in row]) \
        .collect()

    row_length = len(agg.collect()[0]) - 1
    # mllib는 natrices가 들어가야 합니다
    agg = ln.Matrices.dense(row_length, 2, agg_rdd)
    
    test = st.Statistics.chiSqTest(agg)
    print(cat, round(test.pValue, 4))

BIRTH_PLACE 0.0
DIABETES_PRE 0.0
DIABETES_GEST 0.0
HYP_TENS_PRE 0.0
HYP_TENS_GEST 0.0
PREV_BIRTH_PRETERM 0.0


#### 카이스퀘어테스트 분석
그냥 다 다르다 사용해도 무방하다 

### 최종 데이터셋 만들기
- 스트링 타입은 해싱
- 형태를 동일하게 잡음

In [22]:
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

hashing = ft.HashingTF(7)

print(births_transformed.take(10))


births_hashed = births_transformed \
    .rdd \
    .map(lambda row: [
            list(hashing.transform(row[1]).toArray()) 
                if col == 'BIRTH_PLACE' 
                else row[i] 
            for i, col 
            in enumerate(features_to_keep)]) \
    .map(lambda row: [[e] if type(e) == int else e 
                      for e in row]) \
    .map(lambda row: [item for sublist in row 
                      for item in sublist]) \
    .map(lambda row: reg.LabeledPoint(
            row[0], 
            ln.Vectors.dense(row[1:]))
        )

births_hashed.take(10)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=29, FATHER_COMBINED_AGE=99, CIG_1_TRI=0, MOTHER_HEIGHT_IN=99, MOTHER_PRE_WEIGHT=999, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=22, FATHER_COMBINED_AGE=29, CIG_1_TRI=0, MOTHER_HEIGHT_IN=65, MOTHER_PRE_WEIGHT=180, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=38, FATHER_COMBINED_AGE=40, CIG_1_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=155, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=39, FATHER_COMBINED_AGE=42, CIG_1_TRI=0, MOTHER_HEIGHT_IN=60, MOTHER_PRE_WEIGHT=128, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=1), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', 

[LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,29.0,99.0,0.0,99.0,999.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,22.0,29.0,0.0,65.0,180.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,38.0,40.0,0.0,63.0,155.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,39.0,42.0,0.0,60.0,128.0,0.0,0.0,0.0,0.0,1.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,18.0,99.0,4.0,61.0,110.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,32.0,37.0,0.0,66.0,150.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,22.0,25.0,0.0,68.0,155.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,25.0,26.0,0.0,64.0,136.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,26.0,32.0,0.0,64.0,140.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,39.0,66.0,0.0,65.0,140.0,0.0,0.0,0.0,0.0,0.0])]

#### 학습용과 테스트용으로 분할 
6:4로 나눔

In [23]:
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

births_test.take(10)

[LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,29.0,99.0,0.0,99.0,999.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,39.0,42.0,0.0,60.0,128.0,0.0,0.0,0.0,0.0,1.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,18.0,99.0,4.0,61.0,110.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,32.0,37.0,0.0,66.0,150.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,25.0,26.0,0.0,64.0,136.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,33.0,99.0,0.0,65.0,145.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,29.0,99.0,0.0,60.0,115.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,31.0,41.0,0.0,59.0,106.0,0.0,0.0,0.0,0.0,1.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,27.0,99.0,0.0,66.0,213.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [1.0,0.0,0.0,0.0,0.0,0.0,0.0,34.0,31.0,0.0,70.0,130.0,0.0,0.0,0.0,0.0,0.0])]

### Logistic Regression

In [24]:
from pyspark.mllib.classification \
    import LogisticRegressionWithLBFGS

LR_Model = LogisticRegressionWithLBFGS \
    .train(births_train, iterations=100)


학습된 모델을 테스트셋을 이용해서 확인해보겠습니다.

In [53]:
truth = spark.sparkContext.parallelize(births_test.map(lambda row: row.label).map(lambda x: x * 1.0).collect())
prediction = spark.sparkContext.parallelize(LR_Model.predict(births_test.map(lambda row: row.features)).map(lambda x: x * 1.0).collect())
LR_results = truth.zip(prediction)
LR_results.collect()

[(0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),

In [33]:
import pyspark.mllib.evaluation as ev

# import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()



Area under PR: 0.80
Area under ROC: 0.63


셀렉터를 통해서 가장 좋은 피쳐를 뽑아냅니다. 

In [40]:
selector = ft.ChiSqSelector(4).fit(births_train)


train_label = spark.sparkContext.parallelize(births_train.map(lambda row: row.label).collect())
selected_train_feature = spark.sparkContext.parallelize(selector.transform(births_train.map(lambda row: row.features)).collect())

topFeatures_train = (train_label.zip(selected_train_feature)).map(lambda row: reg.LabeledPoint(row[0], row[1]))



test_label = spark.sparkContext.parallelize(births_test.map(lambda row: row.label).collect()) 
selected_test_feature = spark.sparkContext.parallelize(selector.transform(births_test.map(lambda row: row.features)).collect())


topFeatures_test = test_label.zip(selected_test_feature).map(lambda row: reg.LabeledPoint(row[0], row[1]))

### random forest

In [41]:
from pyspark.mllib.tree import RandomForest

RF_model = RandomForest \
    .trainClassifier(data=topFeatures_train, 
                     numClasses=2, 
                     categoricalFeaturesInfo={}, 
                     numTrees=6,  
                     featureSubsetStrategy='all',
                     seed=666)

In [49]:
top_feature_truth = spark.sparkContext.parallelize(topFeatures_test.map(lambda row: row.label).collect())
top_feature_prediction = spark.sparkContext.parallelize(RF_model.predict(topFeatures_test.map(lambda row: row.features)).collect())

RF_results = top_feature_truth.zip(top_feature_prediction)

RF_results.collect()

[(0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),
 (0.0, 0.0),
 (0.0, 1.0),
 (0.0, 0.0),

In [51]:
RF_evaluation = ev.BinaryClassificationMetrics(RF_results)

print('Area under PR: {0:.2f}' \
      .format(RF_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()

Area under PR: 0.79
Area under ROC: 0.62


In [58]:
LR_Model_2 = LogisticRegressionWithLBFGS \
    .train(topFeatures_train, iterations=10)

top_feature_truth = spark.sparkContext.parallelize(topFeatures_test.map(lambda row: row.label).map(lambda x: x * 1.0).collect())
top_feature_prediction = spark.sparkContext.parallelize(LR_Model_2.predict(topFeatures_test.map(lambda row: row.features)).map(lambda x: x * 1.0).collect())
LR_results_2 = top_feature_truth.zip(top_feature_prediction)

LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()



Area under PR: 0.83
Area under ROC: 0.63
