In [1]:
from pyspark.sql import SparkSession
import pandas as pd

# Start Spark session
spark = SparkSession.builder.appName('ml-income').getOrCreate()

#from google.colab import files
uploaded = files.upload()

# Load the dataset
df = spark.read.csv("income.csv", header=True, inferSchema=True)
df.show()

# Check schema and preview
df.printSchema()
print(pd.DataFrame(df.take(5), columns=df.columns).transpose())

# Inspection & identification of numeric columns
print(df.dtypes)
numeric_features = [t[0] for t in df.dtypes if t[1] == 'double']


Saving income.csv to income.csv
+---+-----------------+--------+-------------+---------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------------+
|age|        workclass|  weight|    education|education_years|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|
+---+-----------------+--------+-------------+---------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------------+
| 39|        State-gov| 77516.0|    Bachelors|           13.0|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States|       <=50K|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|           13.0|  Married-civ-spouse|   Exec-managerial|      

In [2]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Define categorical and numeric columns
categorical_columns = ['workclass', 'education', 'marital_status', 'occupation',
                       'relationship', 'race', 'sex', 'citizenship']
numeric_columns = ['age', 'weight', 'education_years', 'capital_gain',
                   'capital_loss', 'hours_per_week']

# Index categorical columns
for col in categorical_columns:
    indexer = StringIndexer(inputCol=col, outputCol=col + "_indexed")
    df = indexer.fit(df).transform(df)

# Assemble features
feature_columns = numeric_columns + [col + "_indexed" for col in categorical_columns]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df = assembler.transform(df)
df.show()

# Label indexing (Target: income_class)
label_indexer = StringIndexer(inputCol='income_class', outputCol='label')
df = label_indexer.fit(df).transform(df)
df.show()

# Preview sample of transformed data
print(pd.DataFrame(df.take(100), columns=df.columns))


+---+-----------------+--------+-------------+---------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------------+-----------------+-----------------+----------------------+------------------+--------------------+------------+-----------+-------------------+--------------------+
|age|        workclass|  weight|    education|education_years|      marital_status|        occupation|  relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|workclass_indexed|education_indexed|marital_status_indexed|occupation_indexed|relationship_indexed|race_indexed|sex_indexed|citizenship_indexed|            features|
+---+-----------------+--------+-------------+---------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------------+-----------------+--------

In [3]:
# Train-test split
train, test = df.randomSplit([0.7, 0.3])
print("Train Size:", str(train.count()))
print("Test Size:", str(test.count()))


Train Size: 22767
Test Size: 9794


In [5]:
#RandomforestClassifier
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=100, maxBins=64)
rfModel = rf.fit(train)
rf_predictions = rfModel.transform(test)

#DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxBins=64)
dtModel = dt.fit(train)
dt_predictions = dtModel.transform(test)


In [8]:
#Evalaution:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# Evaluation of  both models
rf_accuracy = evaluator.evaluate(rf_predictions)
dt_accuracy = evaluator.evaluate(dt_predictions)

# Errors and Accuracies
print("Random Forest Accuracy: %s" % rf_accuracy)
print("Random Forest Error: %s" % (1.0 - rf_accuracy))

print("Decision Tree Accuracy: %s" % dt_accuracy)
print("Decision Tree Error: %s" % (1.0 - dt_accuracy))

Random Forest Accuracy: 0.8542985501327344
Random Forest Error: 0.14570144986726563
Decision Tree Accuracy: 0.8462323871758219
Decision Tree Error: 0.15376761282417806


In [10]:
#Confusiom matrix
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

# Prepare predictions
preds = rf_predictions.select(['prediction', 'label']).withColumn('label', F.col('label').cast(FloatType()))
metrics = MulticlassMetrics(preds.rdd.map(tuple))

print("Random Forest Confusion Matrix:")
print(metrics.confusionMatrix().toArray())


Random Forest Confusion Matrix:
[[7160.  284.]
 [1143. 1207.]]
