In [2]:
import os
import sys
if sys.platform.startswith('win'):
    #Where you have the code
    os.chdir("C:\spark\sparksourcecode")
    #Where you installed spark.    
    os.environ['SPARK_HOME'] = 'C:\spark\spark-2.3.1-bin-hadoop2.7'
#other platforms - linux/mac
else:
    os.chdir("/spark/code")
    os.environ['SPARK_HOME'] = '/spark/spark-2.3.1-bin-hadoop2.7'

os.curdir

# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']

#Add the following paths to the system path. Please check your installation
#to make sure that these zip files actually exist. The names might change
#as versions change.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.7-src.zip"))

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row

spark = SparkSession.builder.appName('SparkDFExample').getOrCreate()
#............................................................................
sc = spark.sparkContext

In [4]:
ccRaw = sc.textFile("credit-card-default-1000.csv")

In [5]:
ccRaw.take(5)

['CUSTID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_1,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,DEFAULTED',
 '530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 '38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 '43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0',
 '47,20000,2,1,2,22,0,0,2,-1,0,-1,1131,291,582,291,0,291,291,582,0,0,130291,651,0']

In [6]:
dataLines = ccRaw.filter(lambda x: "EDUCATION" not in x)
dataLines.count()

1002

In [7]:
dataLines.take(1000)

['530,20000,2,2,2,21,-1,-1,2,2,-2,-2,0,0,0,0,0,0,0,0,0,0,162000,0,0',
 '38,60000,2,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1576,0',
 '43,10000,1,2,2,22,0,0,0,0,-2,-2,0,0,0,0,0,0,0,0,0,0,0,1500,0',
 '47,20000,2,1,2,22,0,0,2,-1,0,-1,1131,291,582,291,0,291,291,582,0,0,130291,651,0',
 '70,20000,1,4,2,22,2,0,0,0,-1,-1,1692,13250,433,1831,0,2891,13250,433,1831,0,2891,153504,0',
 '79,30000,2,2,2,22,0,0,0,0,-2,-2,1544,1648,1284,1088,892,-304,1300,1000,1000,1000,304,39544,0',
 '99,50000,F,3,1,22,0,0,0,0,-1,-1,0,0,0,0,166,541,0,0,0,166,543,4268,0',
 '104,50000,2,3,2,22,0,0,0,0,-1,-1,780,0,780,390,390,500,0,780,0,390,500,18300,0',
 '135,30000,2,2,2,22,0,0,0,0,-2,-2,92,92,92,0,92,0,92,92,0,92,0,1883,0',
 '170,50000,2,2,2,22,0,0,0,0,2,-1,-190,-9850,-9850,10311,10161,7319,0,0,20161,0,7319,13899,0',
 '173,50000,2,2,2,22,-1,0,0,0,-1,-1,0,0,250,123,789,1222,0,250,0,789,1222,9616,0',
 '179,20000,2,2,2,22,0,0,0,0,-1,-1,0,0,0,0,595,0,0,0,0,595,0,2370,0',
 '198,20000,2,1,2,22,0,0,0,0,-2,-1,-54,273,19,-2

In [8]:
# Cleanup data. Remove lines that are not "CSV
filteredLines = dataLines.filter(lambda x : x.find("aaaaaa") < 0 )
filteredLines.count()

1000

In [9]:
cleanedLines = filteredLines.map(lambda x: x.replace("\"", ""))
cleanedLines.count()
cleanedLines.cache()

PythonRDD[8] at RDD at PythonRDD.scala:49

In [10]:
from pyspark.sql import Row

In [11]:
def convertToRow(instr) :
    attList = instr.split(",")
 
    # PR#06 Round of age to range of 10s.    
    ageRound = round(float(attList[5]) / 10.0) * 10
    
    #Normalize sex to only 1 and 2.
    sex = attList[2]
    if sex =="M":
        sex=1
    elif sex == "F":
        sex=2
    
    #Find average billed Amount. You can either continue with individual
    #amounts or use the average for future predictions
    avgBillAmt = (float(attList[12]) +  \
                    float(attList[13]) + \
                    float(attList[15]) + \
                    float(attList[16]) + \
                    float(attList[16]) + \
                    float(attList[17]) ) / 6.0
                    
    #Find average pay amount
    avgPayAmt = (float(attList[18]) +  \
                    float(attList[19]) + \
                    float(attList[20]) + \
                    float(attList[21]) + \
                    float(attList[22]) + \
                    float(attList[23]) ) / 6.0
                    
    #Find average pay duration. Required for PR#04
    #Make sure numbers are rounded and negative values are eliminated
    avgPayDuration = round((abs(float(attList[6])) + \
                        abs(float(attList[7])) + \
                        abs(float(attList[8])) +\
                        abs(float(attList[9])) +\
                        abs(float(attList[10])) +\
                        abs(float(attList[11]))) / 6)
    
    #Average percentage paid. add this as an additional field to see
    #if this field has any predictive capabilities. This is 
    #additional creative work that you do to see possibilities.                    
    perPay = round((avgPayAmt/(avgBillAmt+1) * 100) / 25) * 25
                    
    values = Row (  CUSTID = attList[0], \
                    LIMIT_BAL = float(attList[1]), \
                    SEX = float(sex),\
                    EDUCATION = float(attList[3]),\
                    MARRIAGE = float(attList[4]),\
                    AGE = float(ageRound), \
                    AVG_PAY_DUR = float(avgPayDuration),\
                    AVG_BILL_AMT = abs(float(avgBillAmt)), \
                    AVG_PAY_AMT = float(avgPayAmt), \
                    PER_PAID= abs(float(perPay)), \
                    DEFAULTED = float(attList[24]) 
                    )
    return values


In [12]:
ccRows = cleanedLines.map(convertToRow)
ccRows.take(60)

[Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=27000.0, AVG_PAY_DUR=2.0, CUSTID='530', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=2700000.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=262.6666666666667, AVG_PAY_DUR=1.0, CUSTID='38', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=60000.0, MARRIAGE=2.0, PER_PAID=26275.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=250.0, AVG_PAY_DUR=1.0, CUSTID='43', DEFAULTED=0.0, EDUCATION=2.0, LIMIT_BAL=10000.0, MARRIAGE=2.0, PER_PAID=25000.0, SEX=1.0),
 Row(AGE=20.0, AVG_BILL_AMT=334.0, AVG_PAY_AMT=21969.166666666668, AVG_PAY_DUR=1.0, CUSTID='47', DEFAULTED=0.0, EDUCATION=1.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=6550.0, SEX=2.0),
 Row(AGE=20.0, AVG_BILL_AMT=3277.3333333333335, AVG_PAY_AMT=28651.5, AVG_PAY_DUR=1.0, CUSTID='70', DEFAULTED=0.0, EDUCATION=4.0, LIMIT_BAL=20000.0, MARRIAGE=2.0, PER_PAID=875.0, SEX=1.0),
 Row(AGE=20.0, AVG_BILL_AMT=960.0, AVG_PAY_AMT=7358.0, AVG_PAY_DUR=1.0, CUSTID='79', DEFA

In [13]:
ccDf = spark.createDataFrame(ccRows)

In [14]:
ccDf.cache()
ccDf.show(10)

+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
| AGE|      AVG_BILL_AMT|       AVG_PAY_AMT|AVG_PAY_DUR|CUSTID|DEFAULTED|EDUCATION|LIMIT_BAL|MARRIAGE| PER_PAID|SEX|
+----+------------------+------------------+-----------+------+---------+---------+---------+--------+---------+---+
|20.0|               0.0|           27000.0|        2.0|   530|      0.0|      2.0|  20000.0|     2.0|2700000.0|2.0|
|20.0|               0.0| 262.6666666666667|        1.0|    38|      0.0|      2.0|  60000.0|     2.0|  26275.0|2.0|
|20.0|               0.0|             250.0|        1.0|    43|      0.0|      2.0|  10000.0|     2.0|  25000.0|1.0|
|20.0|             334.0|21969.166666666668|        1.0|    47|      0.0|      1.0|  20000.0|     2.0|   6550.0|2.0|
|20.0|3277.3333333333335|           28651.5|        1.0|    70|      0.0|      4.0|  20000.0|     2.0|    875.0|1.0|
|20.0|             960.0|            7358.0|        1.0|    79| 

In [15]:
# Enhance data
import pandas as pd

In [16]:
genderDict = [{"SEX" : 1.0, "SEX_NAME" : "Male"}, \
                {"SEX" : 2.0, "SEX_NAME" : "Female"}]                
genderDf = spark.createDataFrame(pd.DataFrame(genderDict, \
            columns=['SEX', 'SEX_NAME']))

In [17]:
genderDf.collect()

[Row(SEX=1.0, SEX_NAME='Male'), Row(SEX=2.0, SEX_NAME='Female')]

In [18]:
ccDf1 = ccDf.join( genderDf, ccDf.SEX== genderDf.SEX ).drop(genderDf.SEX)
ccDf1.take(5)

[Row(AGE=70.0, AVG_BILL_AMT=87.66666666666667, AVG_PAY_AMT=416.6666666666667, AVG_PAY_DUR=1.0, CUSTID='388', DEFAULTED=1.0, EDUCATION=3.0, LIMIT_BAL=80000.0, MARRIAGE=1.0, PER_PAID=475.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='948', DEFAULTED=1.0, EDUCATION=2.0, LIMIT_BAL=50000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male'),
 Row(AGE=60.0, AVG_BILL_AMT=25828.333333333332, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='602', DEFAULTED=1.0, EDUCATION=3.0, LIMIT_BAL=30000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male')]

In [19]:
#Add ED_STR to the data with SQL joins. Required for PR#03

In [20]:
eduDict = [{"EDUCATION" : 1.0, "ED_STR" : "Graduate"}, \
                {"EDUCATION" : 2.0, "ED_STR" : "University"}, \
                {"EDUCATION" : 3.0, "ED_STR" : "High School" }, \
                {"EDUCATION" : 4.0, "ED_STR" : "Others"}]                
eduDf = spark.createDataFrame(pd.DataFrame(eduDict, \
            columns=['EDUCATION', 'ED_STR']))
eduDf.collect()
ccDf2 = ccDf1.join( eduDf, ccDf1.EDUCATION== eduDf.EDUCATION ).drop(eduDf.EDUCATION)
ccDf2.take(5)

[Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='466', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=230000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=19518.166666666668, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='35', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=500000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate'),
 Row(AGE=60.0, AVG_BILL_AMT=87.5, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.0, CUSTID='66', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=200000.0, MARRIAGE=1.0, 

In [21]:
marrDict = [{"MARRIAGE" : 1.0, "MARR_DESC" : "Single"}, \
                {"MARRIAGE" : 2.0, "MARR_DESC" : "Married"}, \
                {"MARRIAGE" : 3.0, "MARR_DESC" : "Others"}]                
marrDf = spark.createDataFrame(pd.DataFrame(marrDict, \
            columns=['MARRIAGE', 'MARR_DESC']))
marrDf.collect()
ccFinalDf = ccDf2.join( marrDf, ccDf2.MARRIAGE== marrDf.MARRIAGE ).drop(marrDf.MARRIAGE)
ccFinalDf.cache()
ccFinalDf.take(5)

[Row(AGE=60.0, AVG_BILL_AMT=56043.166666666664, AVG_PAY_AMT=57956.5, AVG_PAY_DUR=1.0, CUSTID='103', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=480000.0, MARRIAGE=1.0, PER_PAID=100.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='932', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=320000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=0.0, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='466', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=230000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=19518.166666666668, AVG_PAY_AMT=0.0, AVG_PAY_DUR=2.0, CUSTID='35', DEFAULTED=1.0, EDUCATION=1.0, LIMIT_BAL=500000.0, MARRIAGE=1.0, PER_PAID=0.0, SEX=1.0, SEX_NAME='Male', ED_STR='Graduate', MARR_DESC='Single'),
 Row(AGE=60.0, AVG_BILL_AMT=87.5, AVG_PAY_AMT=0.0, AVG_PAY_DUR=1.

In [22]:
ccFinalDf.createOrReplaceTempView("CCDATA")


In [23]:
spark.sql("SELECT SEX_NAME, count(*) as Total, " + \
                " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY SEX_NAME"  ).show()

+--------+-----+--------+-----------+
|SEX_NAME|Total|Defaults|PER_DEFAULT|
+--------+-----+--------+-----------+
|  Female|  591|   218.0|       37.0|
|    Male|  409|   185.0|       45.0|
+--------+-----+--------+-----------+



In [24]:
spark.sql("SELECT MARR_DESC, ED_STR, count(*) as Total," + \
                " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY MARR_DESC,ED_STR " + \
                "ORDER BY 1,2").show()

+---------+-----------+-----+--------+-----------+
|MARR_DESC|     ED_STR|Total|Defaults|PER_DEFAULT|
+---------+-----------+-----+--------+-----------+
|  Married|   Graduate|  268|    69.0|       26.0|
|  Married|High School|   55|    24.0|       44.0|
|  Married|     Others|    4|     2.0|       50.0|
|  Married| University|  243|    65.0|       27.0|
|   Others|   Graduate|    4|     4.0|      100.0|
|   Others|High School|    8|     6.0|       75.0|
|   Others| University|    7|     3.0|       43.0|
|   Single|   Graduate|  123|    71.0|       58.0|
|   Single|High School|   87|    52.0|       60.0|
|   Single|     Others|    3|     2.0|       67.0|
|   Single| University|  198|   105.0|       53.0|
+---------+-----------+-----+--------+-----------+



In [25]:
spark.sql("SELECT AVG_PAY_DUR, count(*) as Total, " + \
                " SUM(DEFAULTED) as Defaults, " + \
                " ROUND(SUM(DEFAULTED) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM CCDATA GROUP BY AVG_PAY_DUR ORDER BY 1"  ).show()

+-----------+-----+--------+-----------+
|AVG_PAY_DUR|Total|Defaults|PER_DEFAULT|
+-----------+-----+--------+-----------+
|        0.0|  356|   141.0|       40.0|
|        1.0|  552|   218.0|       39.0|
|        2.0|   85|    41.0|       48.0|
|        3.0|    4|     2.0|       50.0|
|        4.0|    3|     1.0|       33.0|
+-----------+-----+--------+-----------+



In [26]:
for i in ccDf.columns:
    if not( isinstance(ccDf.select(i).take(1)[0][0], str)) :
        print( "Correlation to DEFAULTED for ", i,\
            ccDf.stat.corr('DEFAULTED',i))

Correlation to DEFAULTED for  AGE 0.5249553884579066
Correlation to DEFAULTED for  AVG_BILL_AMT 0.18782747215913168
Correlation to DEFAULTED for  AVG_PAY_AMT -0.16359608890972746
Correlation to DEFAULTED for  AVG_PAY_DUR 0.02946939689271058
Correlation to DEFAULTED for  DEFAULTED 1.0
Correlation to DEFAULTED for  EDUCATION 0.11056265057032824
Correlation to DEFAULTED for  LIMIT_BAL 0.10722031324020788
Correlation to DEFAULTED for  MARRIAGE -0.22891287287359358
Correlation to DEFAULTED for  PER_PAID -0.027644049670592894
Correlation to DEFAULTED for  SEX -0.08365182215019182


In [27]:
import math
from pyspark.ml.linalg import Vectors

In [28]:
def transformToLabeledPoint(row) :
    lp = ( row["DEFAULTED"], \
            Vectors.dense([
                row["AGE"], \
                row["AVG_BILL_AMT"], \
                row["AVG_PAY_AMT"], \
                row["AVG_PAY_DUR"], \
                row["EDUCATION"], \
                row["LIMIT_BAL"], \
                row["MARRIAGE"], \
                row["PER_PAID"], \
                row["SEX"]
        ]))
    return lp

In [29]:
ccLp = ccFinalDf.rdd.map(transformToLabeledPoint)

In [30]:
ccLp.collect()

[(1.0,
  DenseVector([60.0, 56043.1667, 57956.5, 1.0, 1.0, 480000.0, 1.0, 100.0, 1.0])),
 (1.0, DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 320000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 230000.0, 1.0, 0.0, 1.0])),
 (1.0,
  DenseVector([60.0, 19518.1667, 0.0, 2.0, 1.0, 500000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, 87.5, 0.0, 1.0, 1.0, 200000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([60.0, 125.0, 0.0, 1.0, 1.0, 50000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([50.0, 0.0, 0.0, 1.0, 1.0, 170000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([50.0, 1270.6667, 0.0, 1.0, 1.0, 290000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([50.0, 0.0, 0.0, 2.0, 1.0, 600000.0, 1.0, 0.0, 1.0])),
 (1.0, DenseVector([50.0, 81.8333, 0.0, 2.0, 1.0, 200000.0, 1.0, 0.0, 1.0])),
 (1.0,
  DenseVector([50.0, 11781.5, 171.1667, 1.0, 1.0, 200000.0, 1.0, 0.0, 1.0])),
 (1.0,
  DenseVector([50.0, 111226.1667, 2461.8333, 0.0, 1.0, 380000.0, 1.0, 0.0, 1.0])),
 (1.0,
  DenseVector([50.0, 33623.6667, 933.3333,

In [31]:
ccNormDf = spark.createDataFrame(ccLp,["label", "features"])
ccNormDf.select("label","features").show(10)
ccNormDf.cache()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[60.0,56043.16666...|
|  1.0|[60.0,0.0,0.0,2.0...|
|  1.0|[60.0,0.0,0.0,2.0...|
|  1.0|[60.0,19518.16666...|
|  1.0|[60.0,87.5,0.0,1....|
|  1.0|[60.0,125.0,0.0,1...|
|  1.0|[50.0,0.0,0.0,1.0...|
|  1.0|[50.0,1270.666666...|
|  1.0|[50.0,0.0,0.0,2.0...|
|  1.0|[50.0,81.83333333...|
+-----+--------------------+
only showing top 10 rows



DataFrame[label: double, features: vector]

In [32]:
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(ccNormDf)
td = si_model.transform(ccNormDf)
td.collect()

[Row(label=1.0, features=DenseVector([60.0, 56043.1667, 57956.5, 1.0, 1.0, 480000.0, 1.0, 100.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 320000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 0.0, 0.0, 2.0, 1.0, 230000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 19518.1667, 0.0, 2.0, 1.0, 500000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 87.5, 0.0, 1.0, 1.0, 200000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([60.0, 125.0, 0.0, 1.0, 1.0, 50000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 0.0, 0.0, 1.0, 1.0, 170000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 1270.6667, 0.0, 1.0, 1.0, 290000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=DenseVector([50.0, 0.0, 0.0, 2.0, 1.0, 600000.0, 1.0, 0.0, 1.0]), indexed=1.0),
 Row(label=1.0, features=De

In [33]:
(trainingData, testData) = td.randomSplit([0.7, 0.3])
trainingData.count()
testData.count()

316

In [34]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [35]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="indexed",metricName="accuracy")

In [36]:
dtClassifer = DecisionTreeClassifier(labelCol="indexed", \
                featuresCol="features")
dtModel = dtClassifer.fit(trainingData)

In [37]:
predictions = dtModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Decision Trees : ",evaluator.evaluate(predictions))      


Results of Decision Trees :  0.7151898734177216


In [38]:
rmClassifer = RandomForestClassifier(labelCol="indexed", \
                featuresCol="features")
rmModel = rmClassifer.fit(trainingData)

In [39]:
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","features").collect()
print("Results of Random Forest : ",evaluator.evaluate(predictions)  )


Results of Random Forest :  0.7215189873417721
