In [1]:
!pip install pyspark
!pip install pandas

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fbef06efbb73a7498b6ad90ebdf988de743c718b710a51cc21b31f965597ad92
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [3]:
spark = SparkSession.builder.appName("IncomeClassifierModel").getOrCreate()

In [4]:
from google.colab import files
uploaded = files.upload()

Saving income(1).csv to income(1).csv


In [5]:
data = spark.read.csv('income(1).csv', header=True, inferSchema=True, nullValue='?', ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True)

In [6]:
from pyspark.sql.functions import col, isnull, count, when

In [7]:
from pyspark.sql.functions import regexp_replace

In [8]:
# Check for missing values before dropping
total_rows_before = data.count()
missing_values_before = data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).collect()

In [9]:
print(f"Total rows before dropping missing values: {total_rows_before}")
print("Missing values per column before dropping missing values:")
for row in missing_values_before:
    print(row.asDict())

Total rows before dropping missing values: 32561
Missing values per column before dropping missing values:
{'age': 0, 'workclass': 1836, 'weight': 0, 'education': 0, 'education_years': 0, 'marital_status': 0, 'occupation': 1843, 'relationship': 0, 'race': 0, 'sex': 0, 'capital_gain': 0, 'capital_loss': 0, 'hours_per_week': 0, 'citizenship': 583, 'income_class': 0}


In [10]:
data = data.na.drop()

In [11]:
# Check for missing values after dropping
total_rows_after = data.count()
missing_values_after = data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).collect()

In [12]:
print(f"Total rows after dropping missing values: {total_rows_after}")
print("Missing values per column after dropping missing values:")
for row in missing_values_after:
    print(row.asDict())

Total rows after dropping missing values: 30162
Missing values per column after dropping missing values:
{'age': 0, 'workclass': 0, 'weight': 0, 'education': 0, 'education_years': 0, 'marital_status': 0, 'occupation': 0, 'relationship': 0, 'race': 0, 'sex': 0, 'capital_gain': 0, 'capital_loss': 0, 'hours_per_week': 0, 'citizenship': 0, 'income_class': 0}


In [13]:
from pyspark.sql.types import StringType

In [14]:
data.printSchema()
data.show(5)

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- weight: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_years: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- income_class: string (nullable = true)

+---+----------------+------+---------+---------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+-------------+------------+
|age|       workclass|weight|education|education_years|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|  citizenship|

In [15]:
categorical_columns = [c for c, t in data.dtypes if t == 'string' and t != 'double' and c != 'income_class']
print("Categorical columns:", categorical_columns)

Categorical columns: ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'citizenship']


In [16]:
for column in categorical_columns:
    indexer = StringIndexer(inputCol=column, outputCol=column + "_index")
    data = indexer.fit(data).transform(data)

In [17]:
data.printSchema()
data.show(5)

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- weight: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_years: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- citizenship: string (nullable = true)
 |-- income_class: string (nullable = true)
 |-- workclass_index: double (nullable = false)
 |-- education_index: double (nullable = false)
 |-- marital_status_index: double (nullable = false)
 |-- occupation_index: double (nullable = false)
 |-- relationship_index: double (nullable = false)
 |-- race_index: double (nullable = false)
 |-- sex_index: double (nullable = false)
 |-- citizenship_index: doubl

In [18]:
numerical_columns = [c for c, t in data.dtypes if t in ['int', 'double'] and c != 'income_class']
print("Numerical columns:", numerical_columns)

Numerical columns: ['age', 'weight', 'education_years', 'capital_gain', 'capital_loss', 'hours_per_week', 'workclass_index', 'education_index', 'marital_status_index', 'occupation_index', 'relationship_index', 'race_index', 'sex_index', 'citizenship_index']


In [19]:
assembler_numerical = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")
data = assembler_numerical.transform(data)

In [20]:
data.select("numerical_features").show(5, truncate=False)

+-------------------------------------------------------------------+
|numerical_features                                                 |
+-------------------------------------------------------------------+
|[39.0,77516.0,13.0,2174.0,0.0,40.0,3.0,2.0,1.0,3.0,1.0,0.0,0.0,0.0]|
|(14,[0,1,2,5,6,7,9],[50.0,83311.0,13.0,13.0,1.0,2.0,2.0])          |
|(14,[0,1,2,5,8,9,10],[38.0,215646.0,9.0,40.0,2.0,8.0,1.0])         |
|(14,[0,1,2,5,7,9,11],[53.0,234721.0,7.0,40.0,5.0,8.0,1.0])         |
|[28.0,338409.0,13.0,0.0,0.0,40.0,0.0,2.0,0.0,0.0,4.0,1.0,1.0,8.0]  |
+-------------------------------------------------------------------+
only showing top 5 rows



In [21]:
scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features", withMean=True, withStd=True)
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

In [22]:
data.select("scaled_numerical_features").show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaled_numerical_features                                                                                                                                                                                                                                                                  |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.04279500190227507,-1.0627039855647082,1.1288996674464633,0.14608986264892962,-0.2185823515822073,-0.0777328219923888,1.810892080693077,-0.

In [23]:
feature_columns = ["scaled_numerical_features"] + [col + "_index" for col in categorical_columns]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

In [24]:
data.select("features").show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                                                                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.04279500190227507,-1.0627039855647082,1.12

In [25]:
label_indexer = StringIndexer(inputCol="income_class", outputCol="label").fit(data)
data = label_indexer.transform(data)

In [26]:
data.select("label").show(5)

+-----+
|label|
+-----+
|  0.0|
|  0.0|
|  0.0|
|  0.0|
|  0.0|
+-----+
only showing top 5 rows



In [27]:
data = data.select("features", "label")

In [28]:
data.show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                                                                                                                                                                   |label|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[0.04279500190227507,-1.062

In [29]:
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

In [30]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxBins=120)
dt_model = dt.fit(train_data)

In [31]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", maxBins=120)
rf_model = rf.fit(train_data)

In [32]:
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)

In [33]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy:.2f}")

Decision Tree Accuracy: 0.85


In [35]:
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy:.2f}")

Random Forest Accuracy: 0.85


In [36]:
def print_confusion_matrix(predictions, model_name):
    prediction_and_labels = predictions.select("prediction", "label").rdd
    metrics = MulticlassMetrics(prediction_and_labels)
    confusion_matrix = metrics.confusionMatrix().toArray()
    print(f"Confusion Matrix for {model_name}:\n{confusion_matrix}")

In [37]:
print_confusion_matrix(dt_predictions, "Decision Tree")



Confusion Matrix for Decision Tree:
[[6467.  304.]
 [1054. 1163.]]


In [38]:
print_confusion_matrix(rf_predictions, "Random Forest")

Confusion Matrix for Random Forest:
[[6468.  303.]
 [1060. 1157.]]


In [42]:

if rf_accuracy == dt_accuracy:
    print("Both models perform the same.")
elif rf_accuracy > dt_accuracy:
    print("Random Forest is the better model.")
else:
    print("Decision Tree is the better model.")

Decision Tree is the better model.


In [None]:
spark.stop()