In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("mllib-introduction").master("local[4]").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/31 11:30:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import pyspark.sql.types as typ

In [4]:
labels = [
    ("INFANT_ALIVE_AT_REPORT", typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

In [5]:
!ls

births_train.csv.gz      mllib_introduction.ipynb


In [6]:
births = spark.read.csv("births_train.csv.gz", header=True, schema=schema)

In [7]:
births.toPandas().head(5)

22/01/31 11:30:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,INFANT_ALIVE_AT_REPORT,BIRTH_YEAR,BIRTH_MONTH,BIRTH_PLACE,MOTHER_AGE_YEARS,MOTHER_RACE_6CODE,MOTHER_EDUCATION,FATHER_COMBINED_AGE,FATHER_EDUCATION,MONTH_PRECARE_RECODE,...,INFANT_ANTIBIOTICS,INFANT_SEIZURES,INFANT_NO_ABNORMALITIES,INFANT_ANCEPHALY,INFANT_MENINGOMYELOCELE,INFANT_LIMB_REDUCTION,INFANT_DOWN_SYNDROME,INFANT_SUSPECTED_CHROMOSOMAL_DISORDER,INFANT_NO_CONGENITAL_ANOMALIES_CHECKED,INFANT_BREASTFED
0,N,2015,2,1,29,3,9,99,9,4,...,N,N,0,N,N,N,N,N,0,N
1,N,2015,2,1,22,1,3,29,4,1,...,N,N,0,N,N,N,N,C,0,N
2,N,2015,2,1,38,1,4,40,3,1,...,U,U,9,N,N,N,N,N,1,N
3,N,2015,4,1,39,2,7,42,6,1,...,N,Y,0,Y,N,N,N,N,0,N
4,N,2015,4,1,18,3,2,99,9,2,...,U,U,9,N,N,N,N,N,1,N


In [8]:
# specifying our recode dictionary
recode_dictionary = {
    "YNU": {
        "Y": 1,
        "N": 0,
        "U": 0
    }
}

In [9]:
# our aim is to predict whether `INFANT_ALIVE_AT_REPORT` is either 0 or 1.
# 
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

In [10]:
import pyspark.sql.functions as func

def recode(col, key):
    return recode_dictionary[key][col]

def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat)) \
            .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType()) # creates a user defined functions

In [11]:
# correcting the features related to the number of smoked cigarettes
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

In [12]:
# figuring out which ones are Yes/No/Unknown features
cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) \
            .distinct() \
                .rdd \
                    .map(lambda row: row[0]) \
                        .collect()
    
        if "Y" in dis:
            YNU_cols.append(s[0])

                                                                                

In [13]:
# dataframes can transform the features in bulk while selecing features
births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

Traceback (most recent call last):
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]

In [14]:
# transforming all the `YNU_cols` in one using a list of transformations
exprs_YNU = [
    rec_integer(x, func.lit("YNU")).alias(x)
    if x in YNU_cols
    else x
    for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)

In [15]:
births_transformed.select(YNU_cols[-5:]).show(5)

+------------+-------------+------------+-------------+------------------+
|DIABETES_PRE|DIABETES_GEST|HYP_TENS_PRE|HYP_TENS_GEST|PREV_BIRTH_PRETERM|
+------------+-------------+------------+-------------+------------------+
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 0|
|           0|            0|           0|            0|                 1|
|           0|            0|           0|            0|                 0|
+------------+-------------+------------+-------------+------------------+
only showing top 5 rows



Traceback (most recent call last):
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [16]:
# Descriptive Statistics
# we will use the `colStats(...)` method.

import pyspark.mllib.stat as st
import numpy as np

In [25]:
#numeric_cols = [f.name for f in births_transformed.schema.fields if isinstance(f.dataType, typ.IntegerType)]
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]

numeric_rdd = births_transformed\
    .select(numeric_cols)\
        .rdd\
            .map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

In [26]:
for col, m, v in zip(numeric_cols, mllib_stats.mean(), mllib_stats.variance()):
    print("{0}: \t{1:.2f} \t {2:.2f}".format(col, m, np.sqrt(v)))

MOTHER_AGE_YEARS: 	28.30 	 6.08
FATHER_COMBINED_AGE: 	44.55 	 27.55
CIG_BEFORE: 	1.43 	 5.18
CIG_1_TRI: 	0.91 	 3.83
CIG_2_TRI: 	0.70 	 3.31
CIG_3_TRI: 	0.58 	 3.11
MOTHER_HEIGHT_IN: 	65.12 	 6.45
MOTHER_PRE_WEIGHT: 	214.50 	 210.21
MOTHER_DELIVERY_WEIGHT: 	223.63 	 180.01
MOTHER_WEIGHT_GAIN: 	30.74 	 26.23


In [27]:
numeric_rdd.take(3)

[[29, 99, 0, 0, 0, 0, 99, 999, 999, 99],
 [22, 29, 0, 0, 0, 0, 65, 180, 198, 18],
 [38, 40, 0, 0, 0, 0, 63, 155, 167, 12]]

In [28]:
# for the categorical variable we will calculate the frequencies of their values.
categorical_cols = [e for e in births_transformed.columns if e not in numeric_cols]

In [31]:
categorical_rdd = births_transformed.\
    select(categorical_cols)\
        .rdd\
            .map(lambda row: [e for e in row])

In [32]:
categorical_rdd.take(5)

[[0, '1', 0, 0, 0, 0, 0],
 [0, '1', 0, 0, 0, 0, 0],
 [0, '1', 0, 0, 0, 0, 0],
 [0, '1', 0, 0, 0, 0, 1],
 [0, '1', 0, 0, 0, 0, 0]]

In [39]:
for i, col in enumerate(categorical_cols):
    agg = categorical_rdd \
        .groupBy(lambda row: row[i])\
            .map(lambda row: (row[0], len(row[1])))
    
    print(col, sorted(agg.collect(), key=lambda el: el[1], reverse=True))

INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [(0, 44881), (1, 548)]
DIABETES_GEST [(0, 43451), (1, 1978)]
HYP_TENS_PRE [(0, 44348), (1, 1081)]
HYP_TENS_GEST [(0, 43302), (1, 2127)]
PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]


In [40]:
# Correlations
corrs = st.Statistics.corr(numeric_rdd)

# for i, el in enumerate(corr > 0.5):
#     correlated = [
#         (numeric_cols[j], corrs[i][j])

#     ]

22/01/31 11:48:17 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/31 11:48:17 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [42]:
corrs.shape

(10, 10)

In [50]:
sum(corrs > 0.5)

array([1, 1, 4, 4, 4, 4, 1, 3, 3, 3])

In [53]:
for i in [True, True, False, False]:
    print(i)

True
True
False
False


In [78]:
pd.DataFrame(corrs)#.iloc[0].sum()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,1.0,-0.035203,-0.064101,-0.045254,-0.033569,-0.02732,0.041911,0.02852,0.022333,0.014624
1,-0.035203,1.0,0.087993,0.094362,0.091438,0.076141,0.086203,0.1279,0.097506,0.035482
2,-0.064101,0.087993,1.0,0.825531,0.722135,0.623034,-0.010871,-0.026424,-0.004672,-0.011881
3,-0.045254,0.094362,0.825531,1.0,0.865457,0.75992,-0.006381,-0.012328,-0.001128,-0.014818
4,-0.033569,0.091438,0.722135,0.865457,1.0,0.893076,-0.002765,-0.006062,0.00146,-0.014359
5,-0.02732,0.076141,0.623034,0.75992,0.893076,1.0,-0.000938,-0.003776,0.004836,-0.006379
6,0.041911,0.086203,-0.010871,-0.006381,-0.002765,-0.000938,1.0,0.45257,0.474217,0.331764
7,0.02852,0.1279,-0.026424,-0.012328,-0.006062,-0.003776,0.45257,1.0,0.53597,0.649941
8,0.022333,0.097506,-0.004672,-0.001128,0.00146,0.004836,0.474217,0.53597,1.0,0.596929
9,0.014624,0.035482,-0.011881,-0.014818,-0.014359,-0.006379,0.331764,0.649941,0.596929,1.0


In [80]:
corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
    correlated = [
        (numeric_cols[j], corrs[i][j]) 
        for j, e in enumerate(el) 
        if e == 1.0 and j != i]
    
    if len(correlated) > 0:
        for e in correlated:
            print('{0}-to-{1}: {2:.2f}' \
                  .format(numeric_cols[i], e[0], e[1]))

Traceback (most recent call last):
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 663, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/Users/prashant/.local/share/virtualenvs/spark_test-8u_MymxW/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


CIG_BEFORE-to-CIG_1_TRI: 0.83
CIG_BEFORE-to-CIG_2_TRI: 0.72
CIG_BEFORE-to-CIG_3_TRI: 0.62
CIG_1_TRI-to-CIG_BEFORE: 0.83
CIG_1_TRI-to-CIG_2_TRI: 0.87
CIG_1_TRI-to-CIG_3_TRI: 0.76
CIG_2_TRI-to-CIG_BEFORE: 0.72
CIG_2_TRI-to-CIG_1_TRI: 0.87
CIG_2_TRI-to-CIG_3_TRI: 0.89
CIG_3_TRI-to-CIG_BEFORE: 0.62
CIG_3_TRI-to-CIG_1_TRI: 0.76
CIG_3_TRI-to-CIG_2_TRI: 0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT: 0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT: 0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN: 0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT: 0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT: 0.60


In [81]:
# we can drop most of highly correlated features.
features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_transformed = births_transformed.select([e for e in features_to_keep])

In [82]:
# statistical testing
# running achi-square test to determin if there are significant differences for categorical variables
import pyspark.mllib.linalg as ln

for cat in categorical_cols[1:]:
    agg = births_transformed \
        .groupby("INFANT_ALIVE_AT_REPORT")\
            .pivot(cat)\
                .count()

    agg_rdd = agg\
        .rdd\
            .map(lambda row: (row[1:]))\
                .flatMap(lambda row:
                [0 if e == None else e for e in row]) \
                    .collect()

In [83]:
agg_rdd

[22685, 664, 20403, 1677]

In [89]:
row_length = len(agg.collect()[0]) - 1

In [90]:
st.Statistics.chiSqTest(ln.Matrices.dense(row_length, 2, agg_rdd))

DenseMatrix(2, 2, [22685.0, 664.0, 20403.0, 1677.0], False)

In [92]:
st.Statistics.chiSqTest(ln.Matrices.dense(row_length, 2, agg_rdd)).pValue

0.0