In [None]:
from pyspark.sql import Row

from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
spark=SparkSession.builder.getOrCreate()
sparkContext=SQLContext(sc)

ccRaw = SpContext.textFile("./ccdefault.csv")
ccRaw.take(5)

#Remove header row
dataLines = ccRaw.filter(lambda x: "EDUCATION" not in x)
dataLines.count()
dataLines.take(1000)

#Cleaning data. 
filteredLines = dataLines.filter(lambda x : x.find("aaaaaa") < 0 )
filteredLines.count()

cleanedLines = filteredLines.map(lambda x: x.replace("\"", ""))
cleanedLines.count()
cleanedLines.cache()

#Converting to SQL Dataframe. 
def convertToRow(instr) :
    attList = instr.split(",")
 
    ageRound = round(float(attList[5]) / 10.0) * 10
    
    sex = attList[2]
    if sex =="M":
        sex=1
    elif sex == "F":
        sex=2
    
    #Avg billed Amount. 
    avgBillAmt = (float(attList[12]) +  float(attList[13]) + float(attList[15]) + float(attList[16]) + float(attList[16]) + float(attList[17]) ) / 6.0
                    
    #Avg pay amount
    avgPayAmt = (float(attList[18]) +  float(attList[19]) + float(attList[20]) + float(attList[21]) + float(attList[22]) + float(attList[23]) ) / 6.0
                    
    #Avg pay duration.
    avgPayDuration = round((abs(float(attList[6])) + abs(float(attList[7])) + abs(float(attList[8])) + abs(float(attList[9])) + abs(float(attList[10])) + abs(float(attList[11]))) / 6)
    
    perPay = round((avgPayAmt/(avgBillAmt+1) * 100) / 25) * 25
                    
    values = Row (  CUSTID = attList[0], LIMIT_BAL = float(attList[1]), SEX = float(sex), EDUCATION = float(attList[3]), MARRIAGE = float(attList[4]),\
                    AGE = float(ageRound), AVG_PAY_DUR = float(avgPayDuration), AVG_BILL_AMT = abs(float(avgBillAmt)), AVG_PAY_AMT = float(avgPayAmt), \
                    PER_PAID= abs(float(perPay)), DEFAULTED = float(attList[24]))

    return values

#Cleanedup RDD    
ccRows = cleanedLines.map(convertToRow)
ccRows.take(60)
#Create a data frame.
ccDf = SpSession.createDataFrame(ccRows)
ccDf.cache()
ccDf.show(10)

#Enhance Data
import pandas as pd

genderDict = [{"SEX" : 1.0, "SEX_NAME" : "Male"}, {"SEX" : 2.0, "SEX_NAME" : "Female"}]                
genderDf = SpSession.createDataFrame(pd.DataFrame(genderDict, columns=['SEX', 'SEX_NAME']))
genderDf.collect()
ccDf1 = ccDf.join( genderDf, ccDf.SEX== genderDf.SEX ).drop(genderDf.SEX)
ccDf1.take(5)

eduDict = [{"EDUCATION" : 1.0, "ED_STR" : "Graduate"}, {"EDUCATION" : 2.0, "ED_STR" : "University"}, \
                {"EDUCATION" : 3.0, "ED_STR" : "High School" }, {"EDUCATION" : 4.0, "ED_STR" : "Others"}]                
eduDf = SpSession.createDataFrame(pd.DataFrame(eduDict, columns=['EDUCATION', 'ED_STR']))
eduDf.collect()
ccDf2 = ccDf1.join( eduDf, ccDf1.EDUCATION== eduDf.EDUCATION ).drop(eduDf.EDUCATION)
ccDf2.take(5)


marrDict = [{"MARRIAGE" : 1.0, "MARR_DESC" : "Single"}, {"MARRIAGE" : 2.0, "MARR_DESC" : "Married"}, {"MARRIAGE" : 3.0, "MARR_DESC" : "Others"}]                

marrDf = SpSession.createDataFrame(pd.DataFrame(marrDict, columns=['MARRIAGE', 'MARR_DESC']))
marrDf.collect()

ccFinalDf = ccDf2.join( marrDf, ccDf2.MARRIAGE== marrDf.MARRIAGE ).drop(marrDf.MARRIAGE)
ccFinalDf.cache()
ccFinalDf.take(5)

#Create a temp view
ccFinalDf.createOrReplaceTempView("CCDATA")


SpSession.sql("SELECT SEX_NAME, count(*) as Total, " + " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY SEX_NAME"  ).show()


SpSession.sql("SELECT MARR_DESC, ED_STR, count(*) as Total," + \
                " SUM(DEFAULTED) as Defaults, " + " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY MARR_DESC,ED_STR " + "ORDER BY 1,2").show()

SpSession.sql("SELECT AVG_PAY_DUR, count(*) as Total, " + " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY AVG_PAY_DUR ORDER BY 1"  ).show()

#Perform first round Correlation analysis
for i in ccDf.columns:
    if not( isinstance(ccDf.select(i).take(1)[0][0], str)) :
        print( "Correlation to DEFAULTED for ", i,\
            ccDf.stat.corr('DEFAULTED',i))


import math
from pyspark.ml.linalg import Vectors

def transformToLabeledPoint(row) :
    lp = ( row["DEFAULTED"], Vectors.dense([row["AGE"], row["AVG_BILL_AMT"], row["AVG_PAY_AMT"], \
                row["AVG_PAY_DUR"], row["EDUCATION"], row["LIMIT_BAL"], row["MARRIAGE"], \
                row["PER_PAID"], row["SEX"]]))
    return lp
    
ccLp = ccFinalDf.rdd.repartition(2).map(transformToLabeledPoint)
ccLp.collect()
ccNormDf = SpSession.createDataFrame(ccLp,["label", "features"])
ccNormDf.select("label","features").show(10)
ccNormDf.cache()


from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(ccNormDf)
td = si_model.transform(ccNormDf)
td.collect()

#Split into training and testing data
(trainingData, testData) = td.randomSplit([0.7, 0.3])
trainingData.count()
testData.count()

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexed",metricName="accuracy")

#Decision Trees model
dtClassifer = DecisionTreeClassifier(labelCol="indexed", featuresCol="features")
dtModel = dtClassifer.fit(trainingData)

#Predict on the test data
predictions = dtModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Decision Trees : ",evaluator.evaluate(predictions))      


rmClassifer = RandomForestClassifier(labelCol="indexed", featuresCol="features")
rmModel = rmClassifer.fit(trainingData)

#Predictions
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Random Forest : ",evaluator.evaluate(predictions)  )

#Create the NB model
nbClassifer = NaiveBayes(labelCol="indexed", featuresCol="features")
nbModel = nbClassifer.fit(trainingData)

#Predict on the test data
predictions = nbModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Naive Bayes : ",evaluator.evaluate(predictions)  )

#Filtering columns needed for clustering
ccClustDf = ccFinalDf.select("SEX","EDUCATION","MARRIAGE","AGE","CUSTID")

#Centering and scaling
summStats=ccClustDf.describe().toPandas()
meanValues=summStats.iloc[1,1:5].values.tolist()
stdValues=summStats.iloc[2,1:5].values.tolist()
bcMeans=SpContext.broadcast(meanValues)
bcStdDev=SpContext.broadcast(stdValues)

def centerAndScale(inRow) :
    global bcMeans
    global bcStdDev
    
    meanArray=bcMeans.value
    stdArray=bcStdDev.value

    retArray=[]
    for i in range(len(meanArray)):
        retArray.append( (float(inRow[i]) - float(meanArray[i])) /\
            float(stdArray[i]) )
    return Row(CUSTID=inRow[4], features=Vectors.dense(retArray))
    
ccMap = ccClustDf.rdd.repartition(2).map(centerAndScale)
ccMap.collect()

#Creating a Spark Data Frame
ccFClustDf = SpSession.createDataFrame(ccMap)
ccFClustDf.cache()
ccFClustDf.select("features").show(10)

# clustering
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=4, seed=1)
model = kmeans.fit(ccFClustDf)
predictions = model.transform(ccFClustDf)
predictions.select("*").show()


In [3]:
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
import pyspark
 
from pyspark import SQLContext

In [30]:

 
sc = pyspark.SparkContext.getOrCreate()
ccRaw = sc.textFile('./ccdefault.csv')

In [31]:
ccRaw.take(5)

['CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED',
 '530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 '38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 '43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0',
 '47,20000,2,1,2,22,0,0,2,-1,0,-1,1131,291,582,291,0,291,291,582,0,0,130291,651,0']

In [34]:
cleanedLines = filteredLines.map(lambda x: x.replace("\"", ""))
cleanedLines.count()
cleanedLines.cache()


PythonRDD[27] at RDD at PythonRDD.scala:48

In [36]:
def convertToRow(instr) :
    attList = instr.split(",")
 
    ageRound = round(float(attList[5]) / 10.0) * 10
    
    sex = attList[2]
    if sex =="M":
        sex=1
    elif sex == "F":
        sex=2
    
    #Avg billed Amount. 
    avgBillAmt = (float(attList[12]) +  float(attList[13]) + float(attList[15]) + float(attList[16]) + float(attList[16]) + float(attList[17]) ) / 6.0
                    
    #Avg pay amount
    avgPayAmt = (float(attList[18]) +  float(attList[19]) + float(attList[20]) + float(attList[21]) + float(attList[22]) + float(attList[23]) ) / 6.0
                    
    #Avg pay duration.
    avgPayDuration = round((abs(float(attList[6])) + abs(float(attList[7])) + abs(float(attList[8])) + abs(float(attList[9])) + abs(float(attList[10])) + abs(float(attList[11]))) / 6)
    
    perPay = round((avgPayAmt/(avgBillAmt+1) * 100) / 25) * 25
                    
    values = Row (  CUSTID = attList[0], LIMIT_BAL = float(attList[1]), SEX = float(sex), EDUCATION = float(attList[3]), MARRIAGE = float(attList[4]),\
                    AGE = float(ageRound), AVG_PAY_DUR = float(avgPayDuration), AVG_BILL_AMT = abs(float(avgBillAmt)), AVG_PAY_AMT = float(avgPayAmt), \
                    PER_PAID= abs(float(perPay)), DEFAULTED = float(attList[24]))

    return values


In [41]:
#Cleanedup RDD  
spark= SparkSession.builder.getOrCreate()
ccRows = cleanedLines.map(convertToRow)
ccRows.take(60)
#Create a data frame.
ccDf = spark.createDataFrame(ccRows)
ccDf.cache()
ccDf.show(10)

#Enhance Data
import pandas as pd

+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
| AGE|      AVG_BILL_AMT|       AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE| PER_PAID|SEX|
+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
|20.0|               0.0|           27000.0|        2.0|   530|      0.0|      2.0|  20000.0|     2.0|2700000.0|2.0|
|20.0|               0.0| 262.6666666666667|        1.0|    38|      0.0|      2.0|  60000.0|     2.0|  26275.0|2.0|
|20.0|               0.0|             250.0|        1.0|    43|      0.0|      2.0|  10000.0|     2.0|  25000.0|1.0|
|20.0|             334.0|21969.166666666668|        1.0|    47|      0.0|      1.0|  20000.0|     2.0|   6550.0|2.0|
|20.0|3277.3333333333335|           28651.5|        1.0|    70|      0.0|      4.0|  20000.0|     2.0|    875.0|1.0|
|20.0|             960.0|            7358.0|        1.0|    79| 

In [43]:
genderDict = [{"SEX" : 1.0, "SEX_NAME" : "Male"}, {"SEX" : 2.0, "SEX_NAME" : "Female"}]                
genderDf = spark.createDataFrame(pd.DataFrame(genderDict, columns=['SEX', 'SEX_NAME']))
genderDf.collect()
ccDf1 = ccDf.join( genderDf, ccDf.SEX== genderDf.SEX ).drop(genderDf.SEX)
ccDf1.take(5)

[Row(AGE=70.0, AVG_BILL_AMT=87.66666666666667, AVG_PAY_AMT=416.6666666666667, AVG_PAY_DUR=1.0, CUSTID='388', DEFAULTED=1.0, EDUCATION=3.0, LIMIT_BAL=80000.0, MARRIAGE=1.0, PER_PAID=475.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='948', DEFAULTED=1.0, EDUCATION=2.0, LIMIT_BAL=50000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=25828.333333333332, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='602', DEFAULTED=1.0, EDUCATION=3.0, LIMIT_BAL=30000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male')]

In [45]:
eduDict = [{"EDUCATION" : 1.0, "ED_STR" : "Graduate"}, {"EDUCATION" : 2.0, "ED_STR" : "University"}, \
                {"EDUCATION" : 3.0, "ED_STR" : "High School" }, {"EDUCATION" : 4.0, "ED_STR" : "Others"}]                
eduDf = spark.createDataFrame(pd.DataFrame(eduDict, columns=['EDUCATION', 'ED_STR']))
eduDf.collect()
ccDf2 = ccDf1.join( eduDf, ccDf1.EDUCATION== eduDf.EDUCATION ).drop(eduDf.EDUCATION)
ccDf2.take(5)


[Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='466', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=230000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=19518.166666666668, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='35', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=500000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=87.5, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='66', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=200000.0, MARRIAGE=1.0, 

In [46]:
marrDict = [{"MARRIAGE" : 1.0, "MARR_DESC" : "Single"}, {"MARRIAGE" : 2.0, "MARR_DESC" : "Married"}, {"MARRIAGE" : 3.0, "MARR_DESC" : "Others"}]                

marrDf = spark.createDataFrame(pd.DataFrame(marrDict, columns=['MARRIAGE', 'MARR_DESC']))
marrDf.collect()

ccFinalDf = ccDf2.join( marrDf, ccDf2.MARRIAGE== marrDf.MARRIAGE ).drop(marrDf.MARRIAGE)
ccFinalDf.cache()
ccFinalDf.take(5)

[Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='466', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=230000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=19518.166666666668, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='35', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=500000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=87.5, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.

In [48]:
ccFinalDf.createOrReplaceTempView("CCDATA")


spark.sql("SELECT SEX_NAME, count(*) as Total, " + " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY SEX_NAME"  ).show()

+--------+-----+--------+-----------+
|SEX_NAME|Total|Defaults|PER_DEFAULT|
+--------+-----+--------+-----------+
|  Female|  591|   218.0|       37.0|
|    Male|  409|   185.0|       45.0|
+--------+-----+--------+-----------+



In [49]:
spark.sql("SELECT MARR_DESC, ED_STR, count(*) as Total," + \
                " SUM(DEFAULTED) as Defaults, " + " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY MARR_DESC,ED_STR " + "ORDER BY 1,2").show()

+---------+-----------+-----+--------+-----------+
|MARR_DESC|     ED_STR|Total|Defaults|PER_DEFAULT|
+---------+-----------+-----+--------+-----------+
|  Married|   Graduate|  268|    69.0|       26.0|
|  Married|High School|   55|    24.0|       44.0|
|  Married|     Others|    4|     2.0|       50.0|
|  Married| University|  243|    65.0|       27.0|
|   Others|   Graduate|    4|     4.0|      100.0|
|   Others|High School|    8|     6.0|       75.0|
|   Others| University|    7|     3.0|       43.0|
|   Single|   Graduate|  123|    71.0|       58.0|
|   Single|High School|   87|    52.0|       60.0|
|   Single|     Others|    3|     2.0|       67.0|
|   Single| University|  198|   105.0|       53.0|
+---------+-----------+-----+--------+-----------+



In [50]:
spark.sql("SELECT AVG_PAY_DUR, count(*) as Total, " + " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY AVG_PAY_DUR ORDER BY 1"  ).show()

+-----------+-----+--------+-----------+
|AVG_PAY_DUR|Total|Defaults|PER_DEFAULT|
+-----------+-----+--------+-----------+
|        0.0|  356|   141.0|       40.0|
|        1.0|  552|   218.0|       39.0|
|        2.0|   85|    41.0|       48.0|
|        3.0|    4|     2.0|       50.0|
|        4.0|    3|     1.0|       33.0|
+-----------+-----+--------+-----------+



In [51]:
for i in ccDf.columns:
    if not( isinstance(ccDf.select(i).take(1)[0][0], str)) :
        print( "Correlation to DEFAULTED for ", i,\
            ccDf.stat.corr('DEFAULTED',i))

Correlation to DEFAULTED for  AGE 0.5249553884579066
Correlation to DEFAULTED for  AVG_BILL_AMT 0.18782747215913168
Correlation to DEFAULTED for  AVG_PAY_AMT -0.1635960889097275
Correlation to DEFAULTED for  AVG_PAY_DUR 0.02946939689271058
Correlation to DEFAULTED for  DEFAULTED 1.0
Correlation to DEFAULTED for  EDUCATION 0.11056265057032824
Correlation to DEFAULTED for  LIMIT_BAL 0.10722031324020788
Correlation to DEFAULTED for  MARRIAGE -0.2289128728735936
Correlation to DEFAULTED for  PER_PAID -0.027644049670592894
Correlation to DEFAULTED for  SEX -0.08365182215019182


In [52]:
import math
from pyspark.ml.linalg import Vectors

def transformToLabeledPoint(row) :
    lp = ( row["DEFAULTED"], Vectors.dense([row["AGE"], row["AVG_BILL_AMT"], row["AVG_PAY_AMT"], \
                row["AVG_PAY_DUR"], row["EDUCATION"], row["LIMIT_BAL"], row["MARRIAGE"], \
                row["PER_PAID"], row["SEX"]]))
    return lp

In [53]:
ccLp = ccFinalDf.rdd.repartition(2).map(transformToLabeledPoint)
ccLp.collect()
ccNormDf = spark.createDataFrame(ccLp,["label", "features"])
ccNormDf.select("label","features").show(10)
ccNormDf.cache()


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[60.0,56043.16666...|
|  1.0|[60.0,0.0,0.0,2.0...|
|  1.0|[60.0,0.0,0.0,2.0...|
|  1.0|[60.0,19518.16666...|
|  1.0|[60.0,87.5,0.0,1....|
|  1.0|[60.0,125.0,0.0,1...|
|  1.0|[50.0,0.0,0.0,1.0...|
|  1.0|[50.0,1270.666666...|
|  1.0|[50.0,0.0,0.0,2.0...|
|  1.0|[50.0,81.83333333...|
+-----+--------------------+
only showing top 10 rows



DataFrame[label: double, features: vector]

In [54]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(ccNormDf)
td = si_model.transform(ccNormDf)
td.collect()

[Row(label=1.0, features=DenseVector([60.0, 56043.1667, 57956.5, 1.0, 1.0, 480000.0, 1.0, 100.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 320000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 230000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 19518.1667, 0.0, 2.0, 1.0, 500000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 87.5, 0.0, 1.0, 1.0, 200000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 125.0, 0.0, 1.0, 1.0, 50000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 0.0, 0.0, 1.0, 1.0, 170000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 1270.6667, 0.0, 1.0, 1.0, 290000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 0.0, 0.0, 2.0, 1.0, 600000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=De

In [55]:
#Split into training and testing data
(trainingData, testData) = td.randomSplit([0.7, 0.3])
trainingData.count()
testData.count()

275

In [56]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [57]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexed",metricName="accuracy")


In [58]:
#Decision Trees model
dtClassifer = DecisionTreeClassifier(labelCol="indexed", featuresCol="features")
dtModel = dtClassifer.fit(trainingData)

In [59]:
#Predict on the test data
predictions = dtModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Decision Trees : ",evaluator.evaluate(predictions))  

Results of Decision Trees :  0.6981818181818182


In [60]:
rmClassifer = RandomForestClassifier(labelCol="indexed", featuresCol="features")
rmModel = rmClassifer.fit(trainingData)

#Predictions
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Random Forest : ",evaluator.evaluate(predictions)  )

Results of Random Forest :  0.7236363636363636


In [62]:
#Create the NB model
nbClassifer = NaiveBayes(labelCol="indexed", featuresCol="features")
nbModel = nbClassifer.fit(trainingData)

#Predict on the test data
predictions = nbModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Naive Bayes : ",evaluator.evaluate(predictions)  )

Results of Naive Bayes :  0.6254545454545455


In [66]:
#Filtering columns needed for clustering
ccClustDf = ccFinalDf.select("SEX","EDUCATION","MARRIAGE","AGE","CUSTID")

#Centering and scaling
summStats=ccClustDf.describe().toPandas()
meanValues=summStats.iloc[1,1:5].values.tolist()
stdValues=summStats.iloc[2,1:5].values.tolist()
bcMeans=sc.broadcast(meanValues)
bcStdDev=sc.broadcast(stdValues)

In [67]:
def centerAndScale(inRow) :
    global bcMeans
    global bcStdDev
    
    meanArray=bcMeans.value
    stdArray=bcStdDev.value

    retArray=[]
    for i in range(len(meanArray)):
        retArray.append( (float(inRow[i]) - float(meanArray[i])) /\
            float(stdArray[i]) )
    return Row(CUSTID=inRow[4], features=Vectors.dense(retArray))

In [68]:
ccMap = ccClustDf.rdd.repartition(2).map(centerAndScale)
ccMap.collect()

[Row(CUSTID='103', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='932', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='466', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='35', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='66', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='553', features=DenseVector([-1.2015, -1.0662, -1.156, 2.489])),
 Row(CUSTID='603', features=DenseVector([-1.2015, -1.0662, -1.156, 1.4974])),
 Row(CUSTID='576', features=DenseVector([-1.2015, -1.0662, -1.156, 1.4974])),
 Row(CUSTID='452', features=DenseVector([-1.2015, -1.0662, -1.156, 1.4974])),
 Row(CUSTID='91', features=DenseVector([-1.2015, -1.0662, -1.156, 1.4974])),
 Row(CUSTID='375', features=DenseVector([-1.2015, -1.0662, -1.156, 1.4974])),
 Row(CUSTID='997', features=DenseVector([-1.2015, -1.0662, -1.156, 0.5057])),
 Row(CUSTID='874', features=DenseVector([-1.2015, -1.0662, -1.156, 0.5057

In [70]:
#Creating a Spark Data Frame
ccFClustDf = spark.createDataFrame(ccMap)
ccFClustDf.cache()
ccFClustDf.select("features").show(10)

+--------------------+
|            features|
+--------------------+
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
|[-1.2014752545458...|
+--------------------+
only showing top 10 rows



In [71]:
# clustering
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=4, seed=1)
model = kmeans.fit(ccFClustDf)
predictions = model.transform(ccFClustDf)
predictions.select("*").show()


+------+--------------------+----------+
|CUSTID|            features|prediction|
+------+--------------------+----------+
|   103|[-1.2014752545458...|         3|
|   932|[-1.2014752545458...|         3|
|   466|[-1.2014752545458...|         3|
|    35|[-1.2014752545458...|         3|
|    66|[-1.2014752545458...|         3|
|   553|[-1.2014752545458...|         3|
|   603|[-1.2014752545458...|         3|
|   576|[-1.2014752545458...|         3|
|   452|[-1.2014752545458...|         3|
|    91|[-1.2014752545458...|         3|
|   375|[-1.2014752545458...|         3|
|   997|[-1.2014752545458...|         3|
|   874|[-1.2014752545458...|         3|
|   840|[-1.2014752545458...|         3|
|   596|[-1.2014752545458...|         3|
|   587|[-1.2014752545458...|         3|
|    97|[-1.2014752545458...|         3|
|   971|[-1.2014752545458...|         3|
|   956|[-1.2014752545458...|         3|
|   862|[-1.2014752545458...|         3|
+------+--------------------+----------+
only showing top