In [2]:
from pyspark.sql import SQLContext

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# @hidden_cell
dashDB = {
  'port':'50000',
  'db':'BLUDB',
  'username':'dash5845',
  'ssljdbcurl':'jdbc:db2://dashdb-entry-yp-dal09-10.services.dal.bluemix.net:50001/BLUDB:sslConnection=true;',
  'host':'dashdb-entry-yp-dal09-10.services.dal.bluemix.net',
  'https_url':'https://dashdb-entry-yp-dal09-10.services.dal.bluemix.net:8443',
  'dsn':'DATABASE=BLUDB;HOSTNAME=dashdb-entry-yp-dal09-10.services.dal.bluemix.net;PORT=50000;PROTOCOL=TCPIP;UID=dash5845;PWD=21g6zdEaXtmU;',
  'hostname':'dashdb-entry-yp-dal09-10.services.dal.bluemix.net',
  'jdbcurl':'jdbc:db2://dashdb-entry-yp-dal09-10.services.dal.bluemix.net:50000/BLUDB',
  'ssldsn':'DATABASE=BLUDB;HOSTNAME=dashdb-entry-yp-dal09-10.services.dal.bluemix.net;PORT=50001;PROTOCOL=TCPIP;UID=dash5845;PWD=21g6zdEaXtmU;Security=SSL;',
  'uri':'db2://dash5845:21g6zdEaXtmU@dashdb-entry-yp-dal09-10.services.dal.bluemix.net:50000/BLUDB',
  'password':"""21g6zdEaXtmU"""
}

sqlContext = SQLContext(sc)

props = {}
props['user'] = dashDB['username']
props['password'] = dashDB['password']


# fill in table name
tableName = dashDB['username'] + "." + "MORTGAGE_REPORT"

rawDataSet = sqlContext.read.jdbc(dashDB['jdbcurl'],tableName,properties=props)
#rawDataSet.printSchema()

rawDataSet.registerTempTable("MORTGAGE_REPORT")




In [4]:

# Get the feature set(), and prediction variable(loan_purpose)
featureDataSet = rawDataSet.select(rawDataSet["loan_purpose"], rawDataSet["as_of_year"], (rawDataSet["applicant_income_000s"] *  1000).alias("applicant_income"), rawDataSet["hud_median_family_income"], rawDataSet["state"])
featureDataSet.cache()
print featureDataSet.take(5)

[Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=53000, hud_median_family_income=66100, state=u'Massachusetts'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=256000, hud_median_family_income=61900, state=u'California'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=44000, hud_median_family_income=65100, state=u'Michigan'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=63000, hud_median_family_income=51000, state=u'Florida'), Row(loan_purpose=u'Home purchase', as_of_year=2013, applicant_income=47000, hud_median_family_income=None, state=u'')]


In [5]:
# Function to create LabeledPoint
#def createLabeledPoint(featureArr):
    
#    values = [float(x) for x in featureArr]
#    return LabeledPoint(values[0], values[1:])

# Load and parse the data
#featureArr = featureDataSet.rdd.map(lambda row: (row.loan_purpose, row.applicant_income_000s, row.hud_median_family_income, row.state)).take(5)
#print featureArr

#parsedData = featureDataSet.map(createLabeledPoint)

filterData = featureDataSet.filter(featureDataSet["hud_median_family_income"].isNotNull()).filter(featureDataSet["applicant_income"].isNotNull())

#.filter(featureDataSet["state"] == "Michigan")
trainData = filterData.filter(featureDataSet["as_of_year"] == 2013)
print trainData.take(5)
print trainData.count()

testData = filterData.filter(featureDataSet["as_of_year"] == 2014)
print testData.count()

[Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=53000, hud_median_family_income=66100, state=u'Massachusetts'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=256000, hud_median_family_income=61900, state=u'California'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=44000, hud_median_family_income=65100, state=u'Michigan'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=63000, hud_median_family_income=51000, state=u'Florida'), Row(loan_purpose=u'Refinancing', as_of_year=2013, applicant_income=55000, hud_median_family_income=73400, state=u'Illinois')]
14511287
10029084


In [6]:

label_indexer = StringIndexer(inputCol = 'loan_purpose', outputCol = 'label')
state_indexer = StringIndexer(inputCol = 'state', outputCol = 'state_indexed')

#reduced_numeric_cols = ["applicant_income", "hud_median_family_income"]
reduced_numeric_cols = ["applicant_income"]

assembler = VectorAssembler(inputCols =  reduced_numeric_cols, outputCol = 'features')


#(train, test) = trainData.randomSplit([0.7, 0.3])
classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features').setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
pipeline = Pipeline(stages=[label_indexer, assembler, classifier])
model = pipeline.fit(trainData)

print model



PipelineModel_448f96c17490ccade95c


In [7]:
predictions = model.transform(testData)
predictions.show(10)
# Select example rows to display.
#predictions.select("prediction", "loan_purpose", "features").show(100)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# accuracy, f1, weightedPrecision
#, {evaluator.metricName: "accuracy"}
accuracy = evaluator.evaluate(predictions)

print accuracy



+-------------+----------+----------------+------------------------+---------+-----+----------+--------------------+--------------------+----------+
| loan_purpose|as_of_year|applicant_income|hud_median_family_income|    state|label|  features|       rawPrediction|         probability|prediction|
+-------------+----------+----------------+------------------------+---------+-----+----------+--------------------+--------------------+----------+
|  Refinancing|      2014|           73000|                   75400|    Texas|  0.0| [73000.0]|[11.6805106615444...|[0.58402553307722...|       0.0|
|Home purchase|      2014|           49000|                   45900| Kentucky|  1.0| [49000.0]|[10.8388009743565...|[0.54194004871782...|       0.0|
|  Refinancing|      2014|           33000|                   48800| Missouri|  0.0| [33000.0]|[10.0155968454657...|[0.50077984227328...|       0.0|
|  Refinancing|      2014|          478000|                   58200| New York|  0.0|[478000.0]|[12.5167159