# Introducing MLib package of PySpark

## Load and transform the data

Just like in the previous chapter, we first specify the schema of our dataset.

In [4]:
sc

In [5]:
spark.conf.get('spark.sql.execution.arrow.enabled')

In [6]:
import pyspark.sql.types as typ

labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels])

Next, we load the data.

In [8]:
# 读取数据 指定类型
births = spark.read.csv("/FileStore/tables/births_train_csv.gz",
                       header=True, schema=schema)

In [9]:
births.count()

In [10]:
births.columns

In [11]:
births.select('INFANT_ALIVE_AT_REPORT').show(10)

Specify our recode dictionary.

In [13]:
# yes, no, unknow 转成数字格式
recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

Our goal is to predict whether the `'INFANT_ALIVE_AT_REPORT'` is either 1 or 0. Thus, we will drop all of the features that relate to the infant.

In [15]:
# 跟婴儿相关的特征
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

In [16]:
# 0 没有
# 1~97 为实际数量
# 98表示为98或更多
# 99 未知
births_trimmed.select("CIG_BEFORE").show(5)

Specify the recoding methods.

In [18]:
import pyspark.sql.functions as func

def recode(col, key):  # Y YNU
    return recode_dictionary[key][col]  # 1

def correct_cig(feat):
    """
    假设99未知为状态0
    """
    return func.when(func.col(feat)!=99, func.col(feat)).otherwise(0)

# # udf(f=None, returnType=StringType) 
rec_integer = func.udf(recode, typ.IntegerType())

Correct the features related to the number of smoked cigarettes.

In [20]:
# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
# 更正吸烟数量 相关数据 99->0
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

Figure out which Yes/No/Unknown features are.

In [22]:
births.select('INFANT_ALIVE_AT_REPORT').distinct().rdd.map(lambda row: row[0]).collect()

In [23]:
cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []
for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        # 该字段的所有不同值
        dis = births_trimmed.select(s[0]).distinct().rdd.map(lambda row: row[0]).collect()
        
        if "Y" in dis:
            YNU_cols.append(s[0])
YNU_cols  # 这些列使用 YNU 标记

DataFrames can transform the features *in bulk* while selecting features.

In [25]:
rec_integer  # recode(col, key)

In [26]:
births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')  # Creates a Column of literal value
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

Transform all the `YNU_cols` in one using a list of transformations.

In [28]:
exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]
exprs_YNU

In [29]:
births_transformed = births_transformed.select(exprs_YNU)

Let's check if we got it correctly.

In [31]:
births_transformed.select(YNU_cols[-5:]).show(5)

## Get to know your data

### Descriptive statistics

We will use the `colStats(...)` method.

```
class pyspark.mllib.stat.MultivariateStatisticalSummary(java_model)[source]¶
Trait for multivariate statistical summary of a data matrix.

count()[source]
max()[source]
mean()[source]
min()[source]
normL1()[source]
normL2()[source]
numNonzeros()[source]
variance()[source]
```

In [35]:
import pyspark.mllib.stat as st  # 一些统计相关的
import numpy as np

# 跟数字相关的列
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]
# 返回列表形式
numeric_rdd = births_transformed.select(numeric_cols).rdd.map(lambda row: [e for e in row])

# column-wise summary statistics for the input RDD
mllib_stats = st.Statistics.colStats(numeric_rdd)

# 查看均值 和 方差
print("{} \t {} \t {}".format("col_name", "mean", "variance"))
for col, m, v in zip(numeric_cols, mllib_stats.mean(), mllib_stats.variance()):
    print("{} \t {:.2f} \t {:.2f}".format(col, m, v))


For the categorical variables we will calculate the frequencies of their values.

In [37]:
# 非数值的分类变量  计算频率
categorical_cols = [e for e in births_transformed.columns if e not in numeric_cols]

# 转为rdd: 字段1,字段2, ...
categorial_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row: [e for e in row])

for i, col in enumerate(categorical_cols):
  agg = categorial_rdd\
        .groupBy(lambda row: row[i])\
        .map(lambda row: (row[0], len(row[1])))
  # [(不同的row[0]值, 该值对应的那些row(一个 list)), ()]
  print(col, sorted(agg.collect(), 
                    key=lambda el: el[1],
                    reverse=True))
          

### Correlations

Correlations between our features.

In [40]:
corrs = st.Statistics.corr(numeric_rdd)
corrs

In [41]:
corrs > 0.5

In [42]:
# 检查相关性 默认pearson
corrs = st.Statistics.corr(numeric_rdd)
correlated = []
for i in range(len(corrs)):
  for j in range(i+1, len(corrs)):
    if corrs[i, j] > 0.5:
      correlated.append((numeric_cols[i], numeric_cols[j], corrs[i, j]))


In [43]:
for corr in correlated:
  print("{0} -- {1} : {2:.2f}".format(*corr))

We can drop most of highly correlated features.

In [45]:
# 最终选择一下特征进行
features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select(*features_to_keep)

### Statistical testing

Run a Chi-square test to determine if there are significant differences for categorical variables.

In [48]:
categorical_cols

In [49]:
agg = births_transformed.groupby('INFANT_ALIVE_AT_REPORT').pivot("BIRTH_PLACE").count()
agg.show()

In [50]:
agg.collect()[0]

In [51]:
agg_rdd = agg \
    .rdd\
    .map(lambda row: (row[1:])) \
    .flatMap(lambda row: 
             [0 if e == None else e for e in row]) \
    .collect()   # null 为0
agg_rdd

In [52]:
import pyspark.mllib.linalg as ln

ln.Matrices.dense(2, 3, np.arange(6)).toArray()  # 列存储

In [53]:
import pyspark.mllib.linalg as ln

# 卡方检验确定是否存在显著差异
for cat in categorical_cols[1:]:
    agg = births_transformed \
        .groupby('INFANT_ALIVE_AT_REPORT') \
        .pivot(cat) \
        .count()    

    agg_rdd = agg \
        .rdd\
        .map(lambda row: (row[1:])) \
        .flatMap(lambda row: 
                 [0 if e == None else e for e in row]) \
        .collect()

    row_length = len(agg.collect()[0]) - 1  # 所选的cat特征的类别数量
    # 构造矩阵 dense 直接使用numpy
    agg = ln.Matrices.dense(row_length, 2, agg_rdd)  # INFANT_ALIVE_AT_REPORT 取值只有2个
    # If observed is matrix, conduct Pearson’s independence test on the input contingency matrix,
    test = st.Statistics.chiSqTest(agg)
    print(cat, round(test.pValue, 4))

## Create the final dataset

### Create an RDD of `LabeledPoint`s

We will use a hashing trick to encode the `'BIRTH_PLACE'` feature.

In [56]:
import pyspark.mllib.feature as ft

htf = ft.HashingTF(7)  # Maps a sequence of terms to their term frequencies using the hashing trick.
doc = "a a b b c d".split(" ")
htf.transform(doc)

In [57]:
a = ln.SparseVector(4, [1, 3], [3.0, 4.0])  # 键是 (行, 列) 索引元组 (不允许重复的条目) 值是对应的非零值
a

In [58]:
a.dot(a)

In [59]:
births_transformed.select("BIRTH_PLACE").show(5)

In [60]:
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg

hashing = ft.HashingTF(7)

births_hashed = births_transformed \
    .rdd \
    .map(lambda row: [
            list(hashing.transform(row[1]).toArray()) 
                if col == 'BIRTH_PLACE' 
                else row[i] 
            for i, col 
            in enumerate(features_to_keep)]) \
    .map(lambda row: [[e] if type(e) == int else e 
                      for e in row]) \
    .map(lambda row: [item for sublist in row 
                      for item in sublist]) \
    .map(lambda row: reg.LabeledPoint(  # label, feature
            row[0], 
            ln.Vectors.dense(row[1:]))
        )

### Split into training and testing

Before we move to the modeling stage, we need to split our dataset into two sets: one we'll use for training and another one for testing.

In [63]:

births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

## Predicting infant survival

### Logistic regression in Spark

MLLib used to provide a logistic regression model estimated using a stochastic gradient descent (SGD) algorithm. This model has been deprecated in Spark 2.0 in favor of the `LogisticRegressionWithLBFGS` model.

In [66]:
from pyspark.mllib.classification \
    import LogisticRegressionWithLBFGS

LR_Model = LogisticRegressionWithLBFGS \
    .train(births_train, iterations=10)

Let's now use the model to predict the classes for our testing set.

In [68]:
LR_results = (# (实际标签, 预测标签)
        births_test.map(lambda row: row.label) \
        .zip(LR_Model \
             .predict(births_test\
                      .map(lambda row: row.features)))
    ).map(lambda row: (row[0], row[1] * 1.0)) 

Let's check how well or how bad our model performed.

In [70]:
import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()

In [71]:
metrics = ev.MulticlassMetrics(LR_results)
metrics.accuracy

In [72]:
metrics.precision(1.0)

In [73]:
metrics.precision(0)

### Selecting only the most predictable features

MLLib allows us to select the most predictable features using a Chi-Square selector.

In [76]:
# 使用卡方选择器 来预测最可预测的特征
selector = ft.ChiSqSelector(4).fit(births_train)

# 标签 选择的特征list
topFeatures_train = (
        births_train.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_train \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))

topFeatures_test = (
        births_test.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_test \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))

In [77]:
topFeatures_train.take(2)

### Random Forest in Spark

We are now ready to build the random forest model.

In [79]:
from pyspark.mllib.tree import RandomForest

RF_model = RandomForest.trainClassifier(data=topFeatures_train,
                                        numClasses=2,
                                        categoricalFeaturesInfo={},
                                        numTrees=6,
                                        featureSubsetStrategy='all',
                                        seed=666)

Let's see how well our model did.

In [81]:
RF_results = (
        topFeatures_test.map(lambda row: row.label) \
        .zip(RF_model \
             .predict(topFeatures_test \
                      .map(lambda row: row.features)))
    )

RF_evaluation = ev.BinaryClassificationMetrics(RF_results)

print('Area under PR: {0:.2f}' \
      .format(RF_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()

Let's see how the logistic regression would perform with reduced number of features.

In [83]:
LR_Model_2 = LogisticRegressionWithLBFGS \
    .train(topFeatures_train, iterations=10)

LR_results_2 = (
        topFeatures_test.map(lambda row: row.label) \
        .zip(LR_Model_2 \
             .predict(topFeatures_test \
                      .map(lambda row: row.features)))
    ).map(lambda row: (row[0], row[1] * 1.0))

LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()