In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7ddb9a0762cad92aca7d8dbe9a5b02ea916706759282041fa16c977c692df55a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
import pandas as pd


# Create SparkSession
spark = SparkSession.builder.appName('income-classifier').getOrCreate()

# Load the dataset
df = spark.read.csv('income.csv', header=True, inferSchema=True)

# Feature Engineering
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']

# Assemble numeric features into a vector
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=numeric_features, outputCol='features')
df = assembler.transform(df)
#df.show()

from pyspark.ml.feature import StringIndexer
labels= StringIndexer(inputCol= 'income_class', outputCol= 'label')
df = labels.fit(df).transform(df)
#df.show()

#print(pd.DataFrame(df.take(100), columns = df.columns))

# Splitting the dataset into training and testing sets
train, test = df.randomSplit([0.7, 0.3], seed=42)
print('Train Size: '+str(train.count()))
print('Test Size: '+str(test.count()))

# Random Forest Classifier
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features', labelCol='label')  # Changed labelCol to 'label'
rf_model = rf.fit(train)
rf_predictions = rf_model.transform(test)
rf_predictions.select(rf_predictions.columns).show()

# Decision Tree Classifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')  # Changed labelCol to 'label'
dt_model = dt.fit(train)
dt_predictions = dt_model.transform(test)
dt_predictions.select(dt_predictions.columns).show()

# Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Random Forest Evaluation and finding the accuracy
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
rf_accuracy = evaluator.evaluate(rf_predictions)
print('Random Forest Accuracy: {}'.format(rf_accuracy))

# Decision Tree Evaluation and its accurcy
dt_accuracy = evaluator.evaluate(dt_predictions)
print('Decision Tree Accuracy: {}'.format(dt_accuracy))

# calculaing the Confusion Matrix
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
rf_preds = rf_predictions.select(['prediction', 'label']).withColumn('label', F.col('label').cast(FloatType())).select(['prediction', 'label'])
rf_metrics = MulticlassMetrics(rf_preds.rdd.map(tuple)).confusionMatrix().toArray()
print('Random Forest Confusion Matrix:')
print(rf_metrics)

dt_preds = dt_predictions.select(['prediction', 'label']).withColumn('label', F.col('label').cast(FloatType())).select(['prediction', 'label'])
dt_metrics = MulticlassMetrics(dt_preds.rdd.map(tuple)).confusionMatrix().toArray()
print('Decision Tree Confusion Matrix:')
print(dt_metrics)


Train Size: 22875
Test Size: 9686
+---+---------+--------+-------------+---------------+--------------+----------+---------------+-------------------+-------+------------+------------+--------------+--------------+------------+--------------------+-----+--------------------+--------------------+----------+
|age|workclass|  weight|    education|education_years|marital_status|occupation|   relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|   citizenship|income_class|            features|label|       rawPrediction|         probability|prediction|
+---+---------+--------+-------------+---------------+--------------+----------+---------------+-------------------+-------+------------+------------+--------------+--------------+------------+--------------------+-----+--------------------+--------------------+----------+
| 17|        ?| 41643.0|         11th|            7.0| Never-married|         ?|      Own-child|              White| Female|         0.0|       