In [1]:
# tell jupyter where pyspark is
import findspark
findspark.init()

In [2]:
# import helper packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import SparseVector
import pandas as pd
import numpy as np

In [3]:
# Build a SparkSession; 
spark = SparkSession\
    .builder\
    .appName("Final Project")\
    .getOrCreate()

This pipeline is for qualitative features if neccessary.

In [4]:
# Tokenization
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words",\
                                pattern="[^A-Za-z]+", toLowercase=True)

# Stop Word Removal
stopWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# Vectorization
hashingTF = HashingTF(inputCol="filtered_words",\
                      outputCol="raw_features", numFeatures=20)

# Normalization
idf= IDF(inputCol="raw_features", outputCol="features")

pipeline = Pipeline(stages=[regexTokenizer, stopWordsRemover, hashingTF, idf])

We first load the data and split into 'string' and 'numeric' dataFrames. The 'string' dataFrame will be further split between categorical features and descriptors. This split is determined by inspection using the LCDataDictionary file. 

In order to use the training algorithms, we need to have all the features be numeric. To do this we apply StringIndexing and Vectorization on the categorical and descriptor features, respectively. 

After all the features are numeric, we can merge them back with the numeric dataFrame for final steps in preprocessing.

In [5]:
# Load dataset first as a pandas DF due to schema errors
data = pd.read_csv('data/loan.csv')

# split according to datatypes (numeric vs string)
numeric_pd = data.select_dtypes(include='number')
string_pd = data.select_dtypes(exclude='number')

  interactivity=interactivity, compiler=compiler, result=result)


In [10]:
# convert to Spark DataFrame for easier indexing
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

numeric_df = sqlContext.createDataFrame(numeric_pd[0:100])
string_df = sqlContext.createDataFrame(string_pd[0:100].astype('str'))

Now we need to split the string dataframe into categorical and descriptor features by inspection. 

In [14]:
# Split string_df by inspection







In [15]:
# We can use StringIndexer for categorical features
# By inspection we can see that the following are considered categorical features:

# 'term', 'grade', 'sub_grade', 'home_ownership', 'verification_status', 'loan_status'
# 'pymnt_plan', 'purpose', 

# Note that in this version of Spark, the labels are generated based off freq (label 0 = most frequent)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(string_df) for column in string_df.columns]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(string_df).transform(string_df)


Gradient-boosted tree classifier

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only