In [2]:
# Low Birth Weight
# The data on 189 births were collected at Baystate Medical Center, Springfield, Mass. during 1986. 
# The dataset contains an indicator of low infant birth weight as a response and several risk factors associated with low birth weight. 
# The actual birth weight is also included in the dataset. 
# Reference: http://www.statlab.uni-heidelberg.de/data/linmod/birthweight.html

In [3]:
# Machine Learning Library of PySpark
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

import os
import sys
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Library for confusion matrix, precision, test error
from pyspark.mllib.evaluation import MulticlassMetrics
# Library For Area under ROC curve and Area under precision-recall curve
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Assign resources to the application
sqlContext = SQLContext(sc)

In [4]:
# The data will be loaded into an array.
# This is the summary of the data structure, including the column position and name.
# The first filed starts from position 0. 

# 0 ID  
# 1 LOW   -  1 if birth weight below 2.5kg; 0 if birth weight below 2.5 kg
# 2 AGE   -  age of mother in years
# 3 RACE  -  race of mother (white=1,black=2, other=3)
# 4 SMOKE -  1 if the mother smoked;  0 if the mother did not smoke
# 5 PTL   -  number of previous premature labours for mother
# 6 HT    -  1 if history of hypertension for mother; 0 if no history of hypertension for mother
# 7 UI    -  1 if presence of uterine irrability; 0 if no presence of uterine irrability
# 8 FTB   -  number of physicians visist during first trimester

# Label is a target variable.  PersonInfo is a list of independent variables besides unique identifier

LabeledDocument = Row("PersonID", "PersonInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument

def parseDocument(line):
    values = [str(x) for x in line.split(',')] 
    if (values[1]>'0'):
      low_birth = 1.0
    else:
     low_birth = 0.0
        
    textValue = str(values[2]) + " " + str(values[3])+" " + str(values[4])+" " + str(values[5])+" " + str(values[6])+" " + str(values[7])+" " + str(values[8])
    return LabeledDocument(values[0], textValue, low_birth)


# Load the raw lowbwt.csv file, parse it using the function above
import ibmos2spark

# @hidden_cell
credentials = {
    'auth_url': 'https://identity.open.softlayer.com',
    'project_id': '7984a968f14449858d826f8ba838fbb3',
    'region': 'dallas',
    'user_id': '51e14119a08842f0a9311ea21a5debfc',
    'username': 'member_c8a299d593b1d03a3e131a4df7aec8a60c5f34ce',
    'password': 'DWy^2pn8K=WMrUIu'
}

configuration_name = 'os_85a89ae392d2473988f606b7870013f9_configs'
bmos = ibmos2spark.bluemix(sc, credentials, configuration_name)

data= sc.textFile(bmos.url('DefaultProjectagarner18studentumucedu', 'lowbwt.csv'))
data.take(5)


[u'ID,LOW,AGE,RACE,SMOKE,PTL,HT,UI,FTV',
 u'85,0,19,2,0,0,0,1,0',
 u'86,0,33,3,0,0,0,0,3',
 u'87,0,20,1,1,0,0,0,1',
 u'88,0,21,1,1,0,0,1,2']

In [5]:
#Load the data into a dataframe
documents = data.filter(lambda s: "Name" not in s).map(parseDocument)
training = documents.toDF() # ToDataFrame
training.take(5)

[Row(PersonID=u'ID', PersonInfo=u'AGE RACE SMOKE PTL HT UI FTV', label=1.0),
 Row(PersonID=u'85', PersonInfo=u'19 2 0 0 0 1 0', label=0.0),
 Row(PersonID=u'86', PersonInfo=u'33 3 0 0 0 0 3', label=0.0),
 Row(PersonID=u'87', PersonInfo=u'20 1 1 0 0 0 1', label=0.0),
 Row(PersonID=u'88', PersonInfo=u'21 1 1 0 0 1 2', label=0.0)]

In [6]:
# set up Logistic Regression using Pipeline of SparkML
tokenizer = Tokenizer(inputCol="PersonInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

In [7]:
# set up Logistic Regression Model
# the stages are executed in order
model = pipeline.fit(training)

In [8]:
# Verify the training document 
# to checkpoint your progress with the application
training.show()

+--------+--------------------+-----+
|PersonID|          PersonInfo|label|
+--------+--------------------+-----+
|      ID|AGE RACE SMOKE PT...|  1.0|
|      85|      19 2 0 0 0 1 0|  0.0|
|      86|      33 3 0 0 0 0 3|  0.0|
|      87|      20 1 1 0 0 0 1|  0.0|
|      88|      21 1 1 0 0 1 2|  0.0|
|      89|      18 1 1 0 0 1 0|  0.0|
|      91|      21 3 0 0 0 0 0|  0.0|
|      92|      22 1 0 0 0 0 1|  0.0|
|      93|      17 3 0 0 0 0 1|  0.0|
|      94|      29 1 1 0 0 0 1|  0.0|
|      95|      26 1 1 0 0 0 0|  0.0|
|      96|      19 3 0 0 0 0 0|  0.0|
|      97|      19 3 0 0 0 0 1|  0.0|
|      98|      22 3 0 0 1 0 0|  0.0|
|      99|      30 3 0 1 0 1 2|  0.0|
|     100|      18 1 1 0 0 0 0|  0.0|
|     101|      18 1 1 0 0 0 0|  0.0|
|     102|      15 2 0 0 0 0 0|  0.0|
|     103|      25 1 1 0 0 0 3|  0.0|
|     104|      20 3 0 0 0 1 0|  0.0|
+--------+--------------------+-----+
only showing top 20 rows



In [9]:
# PersonInfo here is a combination of age, race, smoke, ptl, ht, ut, and ftv.
# *x in Document(*x) is for header of Row("PersonID", "PersonInfo")
Document = Row("PersonID", "PersonInfo")
test = sc.parallelize([(227, "32 1 0 0 0 0 3 female"),
              (228, "23 3 0 0 1 0 0 female"),
              (229, "15 2 1 0 0 0 0 male"),
              (230, "32 1 0 0 0 1 0 male"),
              (231, "19 3 1 0 0 0 0 female"),
              (232, "39 1 0 1 0 1 1 male")]).map(lambda x: Document(*x)).toDF()

In [10]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("PersonInfo", "prediction", "probability")
for row in selected.collect():
    print row



Row(PersonInfo=u'32 1 0 0 0 0 3 female', prediction=0.0, probability=DenseVector([0.7688, 0.2312]))
Row(PersonInfo=u'23 3 0 0 1 0 0 female', prediction=0.0, probability=DenseVector([0.5631, 0.4369]))
Row(PersonInfo=u'15 2 1 0 0 0 0 male', prediction=1.0, probability=DenseVector([0.2887, 0.7113]))
Row(PersonInfo=u'32 1 0 0 0 1 0 male', prediction=0.0, probability=DenseVector([0.8455, 0.1545]))
Row(PersonInfo=u'19 3 1 0 0 0 0 female', prediction=0.0, probability=DenseVector([0.7756, 0.2244]))
Row(PersonInfo=u'39 1 0 1 0 1 1 male', prediction=1.0, probability=DenseVector([0.4541, 0.5459]))


In [11]:
# Evaluate the Logistic Regression model
# Select (prediction, true label) and compute test error
pred_lr=model.transform(training).select("prediction", "label")
eval_lr=MulticlassClassificationEvaluator (
    labelCol="label", predictionCol="prediction", metricName="precision")
accuracy_lr=eval_lr.evaluate(pred_lr)
# create RDD
predictionAndLabels_lr=pred_lr.rdd
metrics_lr=MulticlassMetrics(predictionAndLabels_lr)
precision_lr=metrics_lr.precision(1)
recall_lr=metrics_lr.recall(1)
f1Measure_lr = metrics_lr.fMeasure(1.0, 1.0)
print("F1 Measure = %s" % f1Measure_lr)
print ("Test Accuracy = %s" %accuracy_lr)
print ("Test Error = %s" % (1-accuracy_lr))
print ("Precision = %s" %precision_lr)
print ("Recall = %s" %recall_lr)

F1 Measure = 0.530612244898
Test Accuracy = 0.757894736842
Test Error = 0.242105263158
Precision = 0.684210526316
Recall = 0.433333333333


In [12]:
#Print the confusion matrix
metrics_lr.confusionMatrix().toArray()

array([[ 118.,   12.],
       [  34.,   26.]])

In [13]:
bin_lr=BinaryClassificationMetrics(predictionAndLabels_lr)

# Area under precision-recall curve
print("Area under PR = %s" % bin_lr.areaUnderPR)
# Area under precision-recall curve
print("Area under ROC = %s" % bin_lr.areaUnderROC)

Area under PR = 0.648245614035
Area under ROC = 0.670512820513
