In [66]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=2bd96e2ec760c862240d8177bdd91ce84d5f7de83f9edd555390c5e3c292fc44
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.appName('Income').getOrCreate()
# Check Spark Session Information
spark

In [122]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pandas as pd

In [123]:
# Load the Iris dataset
path="/content/drive/MyDrive/Spark/income.csv"
df = spark.read.csv(path, header=True, inferSchema=True)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- weight: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_years: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- income_class: string (nullable = true)



In [124]:
display(df)

DataFrame[age: int, workclass: string, weight: double, education: string, education_years: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, citizenship: string, income_class: string]

In [125]:
df.show(10)

+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------------+
|age|        workclass|  weight| education|education_years|      marital_status|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|
+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------------+
| 39|        State-gov| 77516.0| Bachelors|           13.0|       Never-married|      Adm-clerical| Not-in-family| White|   Male|      2174.0|         0.0|          40.0| United-States|       <=50K|
| 50| Self-emp-not-inc| 83311.0| Bachelors|           13.0|  Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|         0.0|         0.0|          13.0| United-States|       <=50K|
| 38|

In [126]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+-----------+------------+
|summary|               age|   workclass|            weight|    education|  education_years|marital_status|       occupation|relationship|               race|    sex|      capital_gain|    capital_loss|    hours_per_week|citizenship|income_class|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+----------------+------------------+-----------+------------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|       32561|              32561|  32561|             32561|           32561|             32561|      32561|       32561|
|   mean| 38

In [127]:
# Drop rows with missing values
df = df.dropna()
df.show(10)

+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------------+
|age|        workclass|  weight| education|education_years|      marital_status|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|
+---+-----------------+--------+----------+---------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------------+
| 39|        State-gov| 77516.0| Bachelors|           13.0|       Never-married|      Adm-clerical| Not-in-family| White|   Male|      2174.0|         0.0|          40.0| United-States|       <=50K|
| 50| Self-emp-not-inc| 83311.0| Bachelors|           13.0|  Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|         0.0|         0.0|          13.0| United-States|       <=50K|
| 38|

In [128]:
# Import pyspark functions
from pyspark.sql import functions as F
# Create add new column to the dataset
df = df.withColumn('labels', F.when(df.income_class == '<=50K', 0).otherwise(1))
# Drop the Income label
df = df.drop('income_class')
# Show dataset's columns
df.columns

['age',
 'workclass',
 'weight',
 'education',
 'education_years',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'citizenship',
 'labels']

In [129]:
# Selecting categorical features
categorical_columns = [
 'workclass',
 'education',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'citizenship',
 ]

In [131]:
# The index of string values multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns]
# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol()))
    for indexer in indexers]

In [132]:
# Vectorizing encoded values
categorical_encoded = [encoder.getOutputCol() for encoder in encoders]
numerical_columns = ['age', 'weight', 'education_years', 'capital_gain', 'capital_loss', 'hours_per_week']
inputcols = categorical_encoded + numerical_columns
assembler = VectorAssembler(inputCols=inputcols, outputCol="features")

In [133]:
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model = pipeline.fit(df)
# Transform data
transformed = model.transform(df)
display(transformed)

DataFrame[age: int, workclass: string, weight: double, education: string, education_years: double, marital_status: string, occupation: string, relationship: string, race: string, sex: string, capital_gain: double, capital_loss: double, hours_per_week: double, citizenship: string, labels: int, workclass_indexed: double, education_indexed: double, marital_status_indexed: double, occupation_indexed: double, relationship_indexed: double, race_indexed: double, sex_indexed: double, citizenship_indexed: double, workclass_indexed_encoded: vector, education_indexed_encoded: vector, marital_status_indexed_encoded: vector, occupation_indexed_encoded: vector, relationship_indexed_encoded: vector, race_indexed_encoded: vector, sex_indexed_encoded: vector, citizenship_indexed_encoded: vector, features: vector]

In [134]:
transformed.show(5)

+---+-----------------+--------+----------+---------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+-------------------+-------------------------+-------------------------+------------------------------+--------------------------+----------------------------+--------------------+-------------------+---------------------------+--------------------+
|age|        workclass|  weight| education|education_years|     marital_status|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|labels|workclass_indexed|education_indexed|marital_status_indexed|occupation_indexed|relationship_indexed|race_indexed|sex_indexed|citizenship_indexed|workclass_indexed_encoded|education_indexed_encoded|marital_status_indexed_encoded|occupation_indexe

In [135]:
# Transform data (select a dataset only with the relevant features.)
final_data = transformed.select('features', 'labels')
final_data.show(10)

+--------------------+------+
|            features|labels|
+--------------------+------+
|(108,[4,11,26,35,...|     1|
|(108,[1,11,25,34,...|     1|
|(108,[0,9,27,41,4...|     1|
|(108,[0,14,25,41,...|     1|
|(108,[0,11,25,32,...|     1|
|(108,[0,12,25,34,...|     1|
|(108,[0,19,30,37,...|     1|
|(108,[1,9,25,34,4...|     1|
|(108,[0,12,26,32,...|     1|
|(108,[0,11,25,34,...|     1|
+--------------------+------+
only showing top 10 rows



In [136]:
# Split the data into training and testing sets
train_data, test_data = final_data.randomSplit([0.7, 0.3])
print("Train Size: "+str(train_data.count()))
print("Test Size: "+str(test_data.count()))

Train Size: 22811
Test Size: 9750


In [137]:
# Initialize the classification models

In [146]:
dt = DecisionTreeClassifier(featuresCol='features', labelCol='labels')
dtModel = dt.fit(train_data)
dt_predictions = dtModel.transform(test_data)
dt_predictions.select('features', 'labels', 'rawPrediction', 'prediction', 'probability').show(25)


+--------------------+------+-------------+----------+-----------+
|            features|labels|rawPrediction|prediction|probability|
+--------------------+------+-------------+----------+-----------+
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|[0.0,22811.0]|       1.0|  [0.0,1

In [139]:
dt_predictions.select("labels", "prediction").show(10)

+------+----------+
|labels|prediction|
+------+----------+
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
+------+----------+
only showing top 10 rows



In [140]:
evaluator = BinaryClassificationEvaluator(labelCol="labels", rawPredictionCol="rawPrediction")
accuracy = evaluator.evaluate(rf_predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 1.0
Test Error = 0.0


In [141]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'labels')
rfModel = rf.fit(train_data)
rf_predictions = rfModel.transform(test_data)
rf_predictions.select('features', 'labels', 'rawPrediction', 'prediction', 'probability').show(30)

+--------------------+------+-------------+----------+-----------+
|            features|labels|rawPrediction|prediction|probability|
+--------------------+------+-------------+----------+-----------+
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1.0]|
|(108,[0,9,25,32,4...|     1|   [0.0,20.0]|       1.0|  [0.0,1

In [142]:
dt_predictions.select("labels", "prediction").show(10)

+------+----------+
|labels|prediction|
+------+----------+
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
+------+----------+
only showing top 10 rows



In [143]:
evaluator = MulticlassClassificationEvaluator(labelCol="labels", predictionCol="prediction")
accuracy = evaluator.evaluate(dt_predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 1.0
Test Error = 0.0


In [144]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = rf_predictions.select(['prediction','labels']).withColumn('labels', F.col('labels').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','labels'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[9750.]]


In [147]:
# Stop the Spark session
spark.stop()