## CSE545-SDG3-Matrix


In [1]:
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import scipy
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
import os

In [2]:
sc = pyspark.SparkContext(appName="SDG3")
sqlCtx = SQLContext(sc)
sc.version

'2.2.0'

In [3]:
data_path = r'..\..\..\LinkPE14US'
df = sqlCtx.read.text(os.path.join(data_path, "VS14LINK.USNUMPUB"))  #350 mb
#df = sqlCtx.read.text("/VS14LINK.USDENPUB") # 5gb
#df = sqlCtx.read.text("/*.*PUB") # both
df

DataFrame[value: string]

In [4]:
def blank_as_null(x):
    return when(col(x) != ' ', col(x)).otherwise(np.nan)

In [36]:
# pandas data frame after adding column metadata
pndf = df.select(
    df.value.substr(9,4).cast(IntegerType()).alias('Birth_Year'),
    df.value.substr(13,2).cast(IntegerType()).alias('Birth_Month'),
    df.value.substr(75,2).cast(IntegerType()).alias('Mothers_Age'),
    #df.value.substr(299,3).cast(IntegerType()).alias('Delivery_Weight_lbs'),
    df.value.substr(492,2).cast(IntegerType()).alias('Gestational_Age_weeks'),
    df.value.substr(332,2).cast(IntegerType()).alias('Num_Prev_Cesareans'),
    df.value.substr(454,1).cast(IntegerType()).alias('Plurality'),
    df.value.substr(568,1).cast(StringType()).alias('Infant_Living')   
    
).withColumn("id", monotonically_increasing_id())

pndf.printSchema
pndf = pndf.na.fill({'Num_Prev_Cesareans': 0.0})
#pndf = pndf.withColumn("Infant_Living", blank_as_null("Infant_Living"))

In [37]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="Infant_Living", outputCol='Infant_Living_Index')
pndf = stringIndexer.fit(pndf).transform(pndf)
pndf.head(2)

[Row(Birth_Year=2013, Birth_Month=7, Mothers_Age=24, Gestational_Age_weeks=10, Num_Prev_Cesareans=0, Plurality=1, Infant_Living='Y', id=0, Infant_Living_Index=0.0),
 Row(Birth_Year=2013, Birth_Month=9, Mothers_Age=27, Gestational_Age_weeks=8, Num_Prev_Cesareans=1, Plurality=1, Infant_Living='Y', id=1, Infant_Living_Index=0.0)]

In [38]:
from pyspark.ml.feature import VectorAssembler
pca_cols = pndf.columns
pca_cols.remove('Infant_Living')
assembler = VectorAssembler(inputCols=pca_cols, outputCol='features')
vector_df = assembler.transform(pndf)
vector_df.head()

Row(Birth_Year=2013, Birth_Month=7, Mothers_Age=24, Gestational_Age_weeks=10, Num_Prev_Cesareans=0, Plurality=1, Infant_Living='Y', id=0, Infant_Living_Index=0.0, features=DenseVector([2013.0, 7.0, 24.0, 10.0, 0.0, 1.0, 0.0, 0.0]))

### Scaling and Normalization

In [39]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', 
                        outputCol='scaledFeatures', withMean=False, withStd=True) # TODO: should withMean be True??
scaled_df = scaler.fit(vector_df).transform(vector_df)
scaled_df.head()

Row(Birth_Year=2013, Birth_Month=7, Mothers_Age=24, Gestational_Age_weeks=10, Num_Prev_Cesareans=0, Plurality=1, Infant_Living='Y', id=0, Infant_Living_Index=0.0, features=DenseVector([2013.0, 7.0, 24.0, 10.0, 0.0, 1.0, 0.0, 0.0]), scaledFeatures=DenseVector([6292.0563, 2.0564, 3.7862, 1.0951, 0.0, 2.3585, 0.0, 0.0]))

In [40]:
from pyspark.ml.feature import Normalizer
nrmlzer = Normalizer(inputCol='scaledFeatures', outputCol='normalizeFeatures', p=1.0)
l1Normalized = nrmlzer.transform(scaled_df)
l1Normalized.head()

Row(Birth_Year=2013, Birth_Month=7, Mothers_Age=24, Gestational_Age_weeks=10, Num_Prev_Cesareans=0, Plurality=1, Infant_Living='Y', id=0, Infant_Living_Index=0.0, features=DenseVector([2013.0, 7.0, 24.0, 10.0, 0.0, 1.0, 0.0, 0.0]), scaledFeatures=DenseVector([6292.0563, 2.0564, 3.7862, 1.0951, 0.0, 2.3585, 0.0, 0.0]), normalizeFeatures=DenseVector([0.9985, 0.0003, 0.0006, 0.0002, 0.0, 0.0004, 0.0, 0.0]))

### Dimnesionality Reduction: PCA

In [41]:
from pyspark.ml.feature import PCA
num_principal_comp = 4
pca = PCA(k=num_principal_comp, inputCol='normalizeFeatures', outputCol='features_pca')
pca_model = pca.fit(l1Normalized)
pca_feat = pca_model.transform(l1Normalized).select('features_pca')
pca_feat.show(truncate=False)

+--------------------------------------------------------------------------------------+
|features_pca                                                                          |
+--------------------------------------------------------------------------------------+
|[-0.9330562507431365,0.023164943780604293,-0.008915042901885535,0.0028879196561173424]|
|[-0.9328930191839417,0.023144873923885843,-0.008988075307957161,0.0028732940407907126]|
|[-0.9329377333898223,0.023129934209772694,-0.009020190428083955,0.0028982097299626165]|
|[-0.9329831027945481,0.02313465819558465,-0.00902360010932929,0.00292039268416863]    |
|[-0.9327297017248617,0.023151478480475444,-0.009040170681808157,0.002788953083242807] |
|[-0.9327623302599205,0.023116118200052542,-0.00905490911348384,0.0028330927845812842] |
|[-0.9330517371405233,0.02308142638488892,-0.009023234205438425,0.0029859167109833303] |
|[-0.9328137213676434,0.023125917049374483,-0.009106676829105194,0.002878866779627327] |
|[-0.9329216611863506