Installation of spark and pyspark

In [1]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.
!pip install pyspark==3.1.1 #--default version is 3.5.1, but to make this work we must match the spark and pyspark version. Also installs py4j.

0% [Working]            Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [810 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [1,695 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 http:

Java and hadoop

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


Initializing the spark context and sql context

In [4]:
import pyspark
from pyspark import SparkContext

In [5]:
sc = SparkContext.getOrCreate()

In [6]:
from pyspark.sql import SQLContext
sqc = SQLContext(sc)

Reading the dataset from google drive

In [7]:
# mounting the drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
# Reading the dataset
df = sqc.read.csv('/content/drive/MyDrive/Colab Notebooks/bfsi_models_practice/Income_pred_pyspark_pipeline/adult.csv', header=True, inferSchema= True)
# printing the schema of the pyspark dataframe
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



Renaming some of the columns for better readability

In [9]:
cols = ['age','workclass','fnlwgt','education','education_num','marital','occupation','relationship','race','sex','capital_gain','capital_loss','hours_week','native_country','label']
df=df.toDF(*cols)


Numerical continuous columns are presented as String. Converting them to float.

In [10]:
# importing the sql types
from pyspark.sql.types import *

def castFields(df, names, newType):
  for name in names:
    df = df.withColumn(name, df[name].cast(newType))
  return df

# Continuous features
CONTF  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
df = castFields(df, CONTF, FloatType())
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [11]:
df.show(5, truncate = False)

+----+---------+--------+------------+-------------+------------------+-----------------+------------+-----+------+------------+------------+----------+--------------+-----+
|age |workclass|fnlwgt  |education   |education_num|marital           |occupation       |relationship|race |sex   |capital_gain|capital_loss|hours_week|native_country|label|
+----+---------+--------+------------+-------------+------------------+-----------------+------------+-----+------+------------+------------+----------+--------------+-----+
|25.0|Private  |226802.0|11th        |7.0          |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0.0         |0.0         |40.0      |United-States |<=50K|
|38.0|Private  |89814.0 |HS-grad     |9.0          |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0.0         |0.0         |50.0      |United-States |<=50K|
|28.0|Local-gov|336951.0|Assoc-acdm  |12.0         |Married-civ-spouse|Protective-serv  |Husband     |White|Male  |0.0         |0.

Checking the summary statistics for any visible data discrepancies

In [12]:
df.describe().show()

+-------+------------------+-----------+------------------+------------+------------------+--------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+-----+
|summary|               age|  workclass|            fnlwgt|   education|     education_num| marital|      occupation|relationship|              race|   sex|      capital_gain|     capital_loss|        hours_week|native_country|label|
+-------+------------------+-----------+------------------+------------+------------------+--------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+-----+
|  count|             48842|      48842|             48842|       48842|             48842|   48842|           48842|       48842|             48842| 48842|             48842|            48842|             48842|         48842|48842|
|   mean| 38.64358543876172|       null|189664.13459727284|     

Sql style grouping by counts of education categories and average capital gains by marital status.

In [13]:
df.createOrReplaceTempView('adults')
qry = 'select education, count(education) from adults group by education order by count(education)'
sqc.sql(qry).show()
qry = 'select marital, avg(capital_gain) from adults group by marital'
sqc.sql(qry).show()

+------------+----------------+
|   education|count(education)|
+------------+----------------+
|   Preschool|              83|
|     1st-4th|             247|
|     5th-6th|             509|
|   Doctorate|             594|
|        12th|             657|
|         9th|             756|
| Prof-school|             834|
|     7th-8th|             955|
|        10th|            1389|
|  Assoc-acdm|            1601|
|        11th|            1812|
|   Assoc-voc|            2061|
|     Masters|            2657|
|   Bachelors|            8025|
|Some-college|           10878|
|     HS-grad|           15784|
+------------+----------------+

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2

Spark grouping by counts of education categories and average capital gains by marital status.

In [14]:
df.groupBy('education').count().sort('count', ascending = True).show()
df.groupBy('marital').agg({'capital_gain':'mean'}).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



Data preprocesing

In [15]:
# This is mostly basic information so we can safely drop the null values
df = df.dropna()

In [16]:
# adding a squared of age column to the dataframe
from pyspark.sql.functions import *

df = df.withColumn("age_square", col('age') ** 2)

df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



I want to check whether any column has a category with a count < 5

In [17]:
df.createOrReplaceTempView("adults")
qry = 'select native_country, count(native_country) from adults group by native_country having count(native_country) < 5'
sqc.sql(qry).show()

+------------------+---------------------+
|    native_country|count(native_country)|
+------------------+---------------------+
|Holand-Netherlands|                    1|
+------------------+---------------------+



In [18]:
# Removing this value since it is clearly an outlier
df_remove = df.filter(df.native_country != 'Holand-Netherlands')

Building a pipeline to add the following four features

* Encoding the categorical data
* Indexing the label feature
* Adding continuous variables
* Assembly of the steps

In [19]:
# Importing the libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [20]:
CAT_FEAT = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages =[]

for ccol in CAT_FEAT:
  stringIndexer = StringIndexer(inputCol=ccol, outputCol=ccol + "Index")
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[ccol + "classVec"])
  stages += [stringIndexer, encoder]

In [21]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

# Add continuous variable
assemblerInputs = [c + "classVec" for c in CAT_FEAT] + CONTF

# Assemble the steps
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [22]:
stages

[StringIndexer_ba3f4926d797,
 OneHotEncoder_83ea41b0c6dd,
 StringIndexer_36d7c234ac0a,
 OneHotEncoder_c243f8b04548,
 StringIndexer_9c5c16211ba7,
 OneHotEncoder_f3eead2626ca,
 StringIndexer_4ea9a98487b4,
 OneHotEncoder_1b10edbde25b,
 StringIndexer_e85c610e02be,
 OneHotEncoder_d1fe07a91449,
 StringIndexer_779b78669899,
 OneHotEncoder_6a0b971cf081,
 StringIndexer_3883d2f7f9f1,
 OneHotEncoder_a3ccfd5b21c6,
 StringIndexer_fca2cb57d535,
 OneHotEncoder_a414b17ab0ec,
 StringIndexer_e6ab4e642785,
 VectorAssembler_38ba9c376cda]

In [23]:
# Creating the pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [24]:
model

DataFrame[age: float, workclass: string, fnlwgt: float, education: string, education_num: float, marital: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: float, capital_loss: float, hours_week: float, native_country: string, label: string, age_square: double, workclassIndex: double, workclassclassVec: vector, educationIndex: double, educationclassVec: vector, maritalIndex: double, maritalclassVec: vector, occupationIndex: double, occupationclassVec: vector, relationshipIndex: double, relationshipclassVec: vector, raceIndex: double, raceclassVec: vector, sexIndex: double, sexclassVec: vector, native_countryIndex: double, native_countryclassVec: vector, newlabel: double, features: vector]

Building the classifier

In [25]:
# To compute faster we will convert the model to a DataFrame. Need to select newlabel and features using the map function
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
df_train = sqc.createDataFrame(input_data, ["label", "features"])

In [26]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.0,0.0,0.0,0.0,...|
|  0.0|[1.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



Creating a train - test split in the 80:20 ratio

In [27]:
train_data, test_data = df_train.randomSplit([0.8, 0.2], seed = 1234)

In [28]:
# Checking the count of people with above and below 50K income
train_data.createOrReplaceTempView('train_set')
qry = 'select label as train_label, count(label) from train_set group by label'
sqc.sql(qry).show()
test_data.createOrReplaceTempView('test_set')
qry = 'select label as test_label, count(label) from test_set group by label'
sqc.sql(qry).show()

+-----------+------------+
|train_label|count(label)|
+-----------+------------+
|        0.0|       29713|
|        1.0|        9339|
+-----------+------------+

+----------+------------+
|test_label|count(label)|
+----------+------------+
|       0.0|        7441|
|       1.0|        2348|
+----------+------------+



Logistic Regression using pyspark

In [29]:
from pyspark.ml.classification import LogisticRegression

# Initialization of the model
lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 10, regParam = 0.3)

# Training the model
linearmodel = lr.fit(train_data)

In [30]:
# Printing the coefficients
print("Coefficients: " + str(linearmodel.coefficients))
print("Intercept: " + str(linearmodel.intercept))

Coefficients: [-0.06334192188326344,-0.15118609231652533,-0.07047760518818037,-0.17320037822175696,-0.14807202027642133,0.16076063965584378,0.18807356429265815,-0.25584383984243403,-0.19389298576668085,-0.06301836797190781,0.2195093912842339,0.3930416224477307,-0.029981745295448718,-0.3216569703623006,-0.006835425788088198,-0.3403449713366752,-0.44233055369541585,0.5429094490460042,-0.3789522654442827,-0.23480915681269823,0.6042443005032556,-0.35660463792970537,-0.41169403461605003,0.3253928098177621,-0.3498466195246758,-0.2045204256752059,-0.2164555023371736,-0.15713587141206908,-0.12704899355725732,0.19148981194441664,-0.06257851524111259,0.2855344876313434,-0.11803919789741188,0.03712987228129084,-0.2945714649034384,-0.21394169569948765,-0.17442435832791123,-0.11210816291326003,-0.28224249059480183,-0.3365895476557896,0.12176449729033574,0.13136806690713332,-0.2912868521189356,0.26852353087378006,-0.19530926033981041,-0.2986900409050975,-0.24143757090310092,0.4224038109356433,-0.067

Model Evaluation

In [33]:
pred = linearmodel.transform(test_data)
pred.printSchema()
pred.select("label", "prediction", "probability").show(5)

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.93153043967305...|
|  0.0|       0.0|[0.94841745120166...|
|  0.0|       0.0|[0.75520650338856...|
|  0.0|       0.0|[0.91460118761366...|
|  0.0|       0.0|[0.54773961062806...|
+-----+----------+--------------------+
only showing top 5 rows



In [35]:
# Accuracy percentage check of successful predictions over all predictions
def accuracy_m(model):
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100))

accuracy_m(model = linearmodel)

Model accuracy: 82.286%


Classifier metrics, ROC and AOC

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Model evaluation
eval = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")
print(eval.getMetricName())
print(eval.evaluate(pred))

areaUnderROC
0.8925199359321156
