### Libraries

In [133]:
import os
import sys
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.mllib.stat import Statistics
from pyspark.ml.stat import Correlation
import math
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Normalizer

**Spark context and SQL context**

In [2]:
if sys.platform.startswith('win'):
    os.chdir(r"C:\Users\Thatoi\SparkPythonDoBigDataAnalytics-Resources\SparkPythonDoBigDataAnalytics-Resources")
    os.environ['SPARK_HOME'] = 'C:/Users/Thatoi/Downloads/spark-3.0.0-preview2-bin-hadoop2.7'
# create a variable for root path
SPARK_HOME = os.environ['SPARK_HOME'] 

sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))

In [3]:
#Create a Spark Session
SpSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("V2 Maestros") \
    .config("spark.executor.memory", "1g") \
    .config("spark.cores.max","2") \
    .config("spark.sql.warehouse.dir", "file:///c:/Users/temp/spark-warehouse")\
    .getOrCreate()

#Get the Spark Context from Spark Session    
SpContext = SpSession.sparkContext

In [6]:
#Starting a SQL context
sql = pyspark.SQLContext(SpContext)

In [7]:
print(SpContext.version)

3.0.0-preview2


**Data ingestion and preprocessing**

In [10]:
#Defining own schema
#This is done so that there would not be
#any conflict between datatypes
dataschema = [StructField('CustomerId', IntegerType(), True),
              StructField('Balance_limit', IntegerType(), True),
              StructField('Sex', StringType(), True),
              StructField('Education', StringType(), True),
              StructField('Marriage', StringType(), True),
              StructField('Age', IntegerType(), True),
              StructField('Pay1', IntegerType(), True),
              StructField('Pay2', IntegerType(), True),
              StructField('Pay3', IntegerType(), True),
              StructField('Pay4', IntegerType(), True),
              StructField('Pay5', IntegerType(), True),
              StructField('Pay6', IntegerType(), True),
              StructField('BillAmount1', IntegerType(), True),
              StructField('BillAmount2', IntegerType(), True),
              StructField('BillAmount3', IntegerType(), True),
              StructField('BillAmount4', IntegerType(), True),
              StructField('BillAmount5', IntegerType(), True),
              StructField('BillAmount6', IntegerType(), True),
              StructField('PayAmount1', IntegerType(), True),
              StructField('PayAmount2', IntegerType(), True),
              StructField('PayAmount3', IntegerType(), True),
              StructField('PayAmount4', IntegerType(), True),
              StructField('PayAmount5', IntegerType(), True),
              StructField('PayAmount6', IntegerType(), True),
              StructField('Defaulted', StringType(), True),
             ]
finalstructure = StructType(dataschema)

In [11]:
#Reading csv file into spark dataframe using our schema 
df = sql.read.csv('credit-card-default-1000.csv', header = True,schema = finalstructure,mode="DROPMALFORMED")
df.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Balance_limit: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marriage: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Pay1: integer (nullable = true)
 |-- Pay2: integer (nullable = true)
 |-- Pay3: integer (nullable = true)
 |-- Pay4: integer (nullable = true)
 |-- Pay5: integer (nullable = true)
 |-- Pay6: integer (nullable = true)
 |-- BillAmount1: integer (nullable = true)
 |-- BillAmount2: integer (nullable = true)
 |-- BillAmount3: integer (nullable = true)
 |-- BillAmount4: integer (nullable = true)
 |-- BillAmount5: integer (nullable = true)
 |-- BillAmount6: integer (nullable = true)
 |-- PayAmount1: integer (nullable = true)
 |-- PayAmount2: integer (nullable = true)
 |-- PayAmount3: integer (nullable = true)
 |-- PayAmount4: integer (nullable = true)
 |-- PayAmount5: integer (nullable = true)
 |-- PayAmount6: integer (nullable = true)
 |-- Defau

In [19]:
#Analyzing categorical variables
df.select('CustomerId','Sex','Education','Marriage','Age','Defaulted').show(10)

+----------+---+---------+--------+---+---------+
|CustomerId|Sex|Education|Marriage|Age|Defaulted|
+----------+---+---------+--------+---+---------+
|       530|  2|        2|       2| 21|        0|
|        38|  2|        2|       2| 22|        0|
|        43|  1|        2|       2| 22|        0|
|        47|  2|        1|       2| 22|        0|
|        70|  1|        4|       2| 22|        0|
|        79|  2|        2|       2| 22|        0|
|        99|  F|        3|       1| 22|        0|
|       104|  2|        3|       2| 22|        0|
|       135|  2|        2|       2| 22|        0|
|       170|  2|        2|       2| 22|        0|
+----------+---+---------+--------+---+---------+
only showing top 10 rows



In [28]:
#Number of sex,education,marriage categories
print(df.groupby('Sex').agg(func.count('Defaulted')).show(),
      df.groupby('Education').agg(func.count('Defaulted')).show(),
      df.groupby('Marriage').agg(func.count('Defaulted')).show())
      

+----+----------------+
| Sex|count(Defaulted)|
+----+----------------+
|   2|             338|
|   1|             252|
|   F|             253|
|   M|             157|
|null|               2|
+----+----------------+

+---------+----------------+
|Education|count(Defaulted)|
+---------+----------------+
|        2|             448|
|        1|             395|
|        4|               7|
|        3|             150|
|     null|               2|
+---------+----------------+

+--------+----------------+
|Marriage|count(Defaulted)|
+--------+----------------+
|       2|             570|
|       1|             411|
|       3|              19|
|    null|               2|
+--------+----------------+

None None None


In [89]:
#handelling null values
df1=df.na.fill('1','Sex').na.fill('4','Education').na.fill('3','Marriage')
print(df1.groupby('Sex').agg(func.count('Defaulted')).show(),
      df1.groupby('Education').agg(func.count('Defaulted')).show(),
      df1.groupby('Marriage').agg(func.count('Defaulted')).show())

+---+----------------+
|Sex|count(Defaulted)|
+---+----------------+
|  2|             338|
|  1|             254|
|  F|             253|
|  M|             157|
+---+----------------+

+---------+----------------+
|Education|count(Defaulted)|
+---------+----------------+
|        2|             448|
|        1|             395|
|        4|               9|
|        3|             150|
+---------+----------------+

+--------+----------------+
|Marriage|count(Defaulted)|
+--------+----------------+
|       2|             570|
|       1|             411|
|       3|              21|
+--------+----------------+

None None None


In [90]:
#Manipulating data using withColumn & spark sql functions
#df1.map()
df2=df1.withColumn("Sex", func.when(func.col("Sex")=='F', '1').otherwise('2'))
df3=df2.withColumn("Marriage", func.when(func.col("Marriage")=='3', '1').otherwise(func.col("Marriage")))
print(df3.groupby('Sex').agg(func.count('Defaulted')).show(),
      df3.groupby('Education').agg(func.count('Defaulted')).show(),
      df3.groupby('Marriage').agg(func.count('Defaulted')).show())

+---+----------------+
|Sex|count(Defaulted)|
+---+----------------+
|  2|             749|
|  1|             253|
+---+----------------+

+---------+----------------+
|Education|count(Defaulted)|
+---------+----------------+
|        2|             448|
|        1|             395|
|        4|               9|
|        3|             150|
+---------+----------------+

+--------+----------------+
|Marriage|count(Defaulted)|
+--------+----------------+
|       2|             570|
|       1|             432|
+--------+----------------+

None None None


In [92]:
df3.createOrReplaceTempView("events")
sql.sql("""SELECT CustomerId,
                  Sex,
                  Education,
                  Marriage
                  FROM events """).show(10)

+----------+---+---------+--------+
|CustomerId|Sex|Education|Marriage|
+----------+---+---------+--------+
|       530|  2|        2|       2|
|        38|  2|        2|       2|
|        43|  2|        2|       2|
|        47|  2|        1|       2|
|        70|  2|        4|       2|
|        79|  2|        2|       2|
|        99|  1|        3|       1|
|       104|  2|        3|       2|
|       135|  2|        2|       2|
|       170|  2|        2|       2|
+----------+---+---------+--------+
only showing top 10 rows



In [104]:
# Creating Average Pay, BillAmount and PayAmount
marksColumns1 = [col('Pay1'),col('Pay2'),col('Pay3'),col('Pay4'),col('Pay5'),col('Pay6')]
marksColumns2 = [col('BillAmount1'),col('BillAmount2'),col('BillAmount3'),col('BillAmount4'),col('BillAmount5'),col('BillAmount6')]
marksColumns3 = [col('PayAmount1'),col('PayAmount2'),col('PayAmount3'),col('PayAmount4'),col('PayAmount5'),col('PayAmount6')]
averageFunc1 = sum(x for x in marksColumns1)/len(marksColumns1)
averageFunc2 = sum(x for x in marksColumns2)/len(marksColumns2)
averageFunc3 = sum(x for x in marksColumns3)/len(marksColumns3)
df4 = df3.withColumn('Average_Pay', averageFunc1).withColumn('AverageBillAmount',averageFunc2).withColumn('AveragePayAmount',averageFunc3)
df4.select('CustomerId','Sex','Education','Marriage','Age','Average_Pay','AverageBillAmount','AveragePayAmount','Defaulted').show(10)

+----------+---+---------+--------+---+-------------------+------------------+------------------+---------+
|CustomerId|Sex|Education|Marriage|Age|        Average_Pay| AverageBillAmount|  AveragePayAmount|Defaulted|
+----------+---+---------+--------+---+-------------------+------------------+------------------+---------+
|       530|  2|        2|       2| 21|-0.3333333333333333|               0.0|           27000.0|        0|
|        38|  2|        2|       2| 22|-0.6666666666666666|               0.0| 262.6666666666667|        0|
|        43|  2|        2|       2| 22|-0.6666666666666666|               0.0|             250.0|        0|
|        47|  2|        1|       2| 22|                0.0|             431.0|21969.166666666668|        0|
|        70|  2|        4|       2| 22|                0.0|            3349.5|           28651.5|        0|
|        79|  2|        2|       2| 22|-0.6666666666666666|1025.3333333333333|            7358.0|        0|
|        99|  1|        3|  

In [124]:
df4.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- Balance_limit: integer (nullable = true)
 |-- Sex: string (nullable = false)
 |-- Education: string (nullable = false)
 |-- Marriage: string (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Pay1: integer (nullable = true)
 |-- Pay2: integer (nullable = true)
 |-- Pay3: integer (nullable = true)
 |-- Pay4: integer (nullable = true)
 |-- Pay5: integer (nullable = true)
 |-- Pay6: integer (nullable = true)
 |-- BillAmount1: integer (nullable = true)
 |-- BillAmount2: integer (nullable = true)
 |-- BillAmount3: integer (nullable = true)
 |-- BillAmount4: integer (nullable = true)
 |-- BillAmount5: integer (nullable = true)
 |-- BillAmount6: integer (nullable = true)
 |-- PayAmount1: integer (nullable = true)
 |-- PayAmount2: integer (nullable = true)
 |-- PayAmount3: integer (nullable = true)
 |-- PayAmount4: integer (nullable = true)
 |-- PayAmount5: integer (nullable = true)
 |-- PayAmount6: integer (nullable = true)
 |-- De

**Analysis**

In [105]:
#How many defaulted in each of the gender type?
df4.createOrReplaceTempView("data")
sql.sql("SELECT Sex, count(*) as Total, " + \
                " SUM(Defaulted) as Defaults, " + \
                " ROUND(SUM(Defaulted) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM data GROUP BY Sex"  ).show()

+---+-----+--------+-----------+
|Sex|Total|Defaults|PER_DEFAULT|
+---+-----+--------+-----------+
|  2|  749|   281.0|       38.0|
|  1|  253|   122.0|       48.0|
+---+-----+--------+-----------+



In [106]:
#For each marriage & Education type, 
#What is the % of default ?
sql.sql("SELECT Marriage, Education, count(*) as Total," + \
                " SUM(Defaulted) as Defaults, " + \
                " ROUND(SUM(Defaulted) * 100 / count(*)) as PER_DEFAULT " + \
                "FROM data GROUP BY Marriage,Education " + \
                "ORDER BY 1,2").show()

+--------+---------+-----+--------+-----------+
|Marriage|Education|Total|Defaults|PER_DEFAULT|
+--------+---------+-----+--------+-----------+
|       1|        1|  127|    75.0|       59.0|
|       1|        2|  205|   108.0|       53.0|
|       1|        3|   95|    58.0|       61.0|
|       1|        4|    5|     2.0|       40.0|
|       2|        1|  268|    69.0|       26.0|
|       2|        2|  243|    65.0|       27.0|
|       2|        3|   55|    24.0|       44.0|
|       2|        4|    4|     2.0|       50.0|
+--------+---------+-----+--------+-----------+



In [114]:
#For each Marriage, Education and Default type
#What is the Average Pay and Bill Amount?
SpSession.sql("SELECT Marriage, Education, Defaulted as If_Defaulted, count(*) as Total, " + \
                " AVG(AveragePayAmount) as AveragePayAmount, " + \
                " AVG(AverageBillAmount) as AverageBillAmount" +\
                " FROM data GROUP BY Marriage, Education, Defaulted ORDER BY 1"  ).show()

+--------+---------+------------+-----+------------------+------------------+
|Marriage|Education|If_Defaulted|Total|  AveragePayAmount| AverageBillAmount|
+--------+---------+------------+-----+------------------+------------------+
|       1|        2|           0|   97| 4754.037800687283|45614.134020618556|
|       1|        2|           1|  108| 2969.123456790124| 59826.90586419753|
|       1|        3|           1|   58|1912.8620689655168|  52528.4252873563|
|       1|        3|           0|   37|  4501.81981981982| 76919.13063063064|
|       1|        1|           1|   75| 3680.115555555555|          70117.02|
|       1|        1|           0|   52|3280.7916666666665| 53105.48717948718|
|       1|        4|           0|    3|2489.1666666666665|           63433.0|
|       1|        4|           1|    2|            1880.0|49100.583333333336|
|       2|        2|           0|  178| 7049.232209737827| 21216.78838951311|
|       2|        1|           0|  199| 6852.094639865995| 22938

**Vector Assembler**

In [131]:
# convert to vector column first
def transformToLabeledPoint(row) :
    lp = ( row["Defaulted"], \
            Vectors.dense([
                row["Age"], \
                row["AverageBillAmount"], \
                row["AveragePayAmount"], \
                row["Average_Pay"], \
                row["Education"], \
                row["Balance_limit"], \
                row["Marriage"], \
                row["Education"], \
                row["Sex"]
        ]))
    return lp

In [132]:
ccLp = df4.rdd.repartition(2).map(transformToLabeledPoint)
ccLp.collect()
ccNormDf = SpSession.createDataFrame(ccLp,["label", "features"])
ccNormDf.select("label","features").show(10)
ccNormDf.cache()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[21.0,0.0,27000.0...|
|    0|[22.0,0.0,262.666...|
|    0|[22.0,0.0,250.0,-...|
|    0|[22.0,431.0,21969...|
|    0|[22.0,3349.5,2865...|
|    0|[22.0,1025.333333...|
|    0|[22.0,117.8333333...|
|    0|[22.0,473.3333333...|
|    0|[22.0,61.33333333...|
|    0|[22.0,1316.833333...|
+-----+--------------------+
only showing top 10 rows



DataFrame[label: string, features: vector]

In [140]:
#Normalizing features
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
l1NormData = normalizer.transform(ccNormDf)
l1NormData.show()

+-----+--------------------+--------------------+
|label|            features|       features_norm|
+-----+--------------------+--------------------+
|    0|[21.0,0.0,27000.0...|[4.46529825357223...|
|    0|[22.0,0.0,262.666...|[3.64882795223352...|
|    0|[22.0,0.0,250.0,-...|[0.00213993904416...|
|    0|[22.0,431.0,21969...|[5.18523465150902...|
|    0|[22.0,3349.5,2865...|[4.22792351302008...|
|    0|[22.0,1025.333333...|[5.72707866923517...|
|    0|[22.0,117.8333333...|[4.31561533481982...|
|    0|[22.0,473.3333333...|[4.08663669799754...|
|    0|[22.0,61.33333333...|[7.22452397502066...|
|    0|[22.0,1316.833333...|[3.77724552954406...|
|    0|[22.0,693.6666666...|[6.75672217075056...|
|    0|[22.0,1638.666666...|[6.15344453042691...|
|    0|[22.0,592.5,1259....|[0.00185148820377...|
|    0|[22.0,9743.333333...|[2.75434957704040...|
|    0|[22.0,42.83333333...|[0.00109140518417...|
|    0|[22.0,10246.66666...|[1.99520546081688...|
|    0|[22.0,34142.83333...|[1.47889548914468...|


**Partitioning and Modeling**

In [159]:
#Indexing needed as pre-req for Decision Trees
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(l1NormData)
td = si_model.transform(l1NormData)
td.collect()

#Split into training and testing data
(trainingData, testData) = td.randomSplit([0.7, 0.3])
print(trainingData.count(),
testData.count())

689 311


In [164]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [165]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
                    labelCol="indexed",metricName="accuracy")

#Create the Decision Trees model
dtClassifer = DecisionTreeClassifier(labelCol="indexed", \
                featuresCol="features_norm")
dtModel = dtClassifer.fit(trainingData)
#Predict on the test data
predictions = dtModel.transform(testData)
predictions.select("prediction","indexed","label","features_norm").collect()
print("Results of Decision Trees : ",evaluator.evaluate(predictions))      

#Create the Random Forest model
rmClassifer = RandomForestClassifier(labelCol="indexed", \
                featuresCol="features_norm")
rmModel = rmClassifer.fit(trainingData)
#Predict on the test data
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","features_norm").collect()
print("Results of Random Forest : ",evaluator.evaluate(predictions)  )

#Create the Gradient Boost model
gbt = GBTClassifier(labelCol="indexed", featuresCol="features_norm", maxIter=10)
gbmmodel=gbt.fit(trainingData)
#Predict on the test data
predictions = gbmmodel.transform(testData)
predictions.select("prediction","indexed","label","features_norm").collect()
print("Results of Gradient Boost : ",evaluator.evaluate(predictions)  )

#Create the Logistic model
lr = LogisticRegression(labelCol="indexed", featuresCol="features_norm")
lrmodel=lr.fit(trainingData)
#Predict on the test data
predictions = lrmodel.transform(testData)
predictions.select("prediction","indexed","label","features_norm").collect()
print("Results of Logistic regression : ",evaluator.evaluate(predictions)  )

#Create the Linear SVM model
lsvc = LinearSVC(labelCol="indexed", featuresCol="features_norm",maxIter=10, regParam=0.1)
lsvcmodel=lsvc.fit(trainingData)
#Predict on the test data
predictions = lsvcmodel.transform(testData)
predictions.select("prediction","indexed","label","features_norm").collect()
print("Results of Linear SVC : ",evaluator.evaluate(predictions)  )

Results of Decision Trees :  0.6945337620578779
Results of Random Forest :  0.7395498392282959
Results of Gradient Boost :  0.6913183279742765
Results of Logistic regression :  0.729903536977492
Results of Linear SVC :  0.6430868167202572
