To run any individual cell, press shift+enter. The first cell installs pyspark, so click on the cell below and hit shift+enter to install spark into this environment

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (StringIndexer, OneHotEncoder, VectorAssembler,
                               StandardScaler)
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
# from pyspark.mllib.evaluation import MulticlassMetrics # Removed

# Initialize Spark
spark = SparkSession.builder.appName('IncomePrediction').getOrCreate()

# Load data
df = spark.read.csv('income.csv', header=True, inferSchema=True)

# Step 1: Preprocess target
label_indexer = StringIndexer(inputCol="income_class", outputCol="label", handleInvalid="keep")

# Step 2: Preprocess categorical features
categorical_cols = [t[0] for t in df.dtypes if t[1] == 'string' and t[0] != "income_class"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="keep") for col in categorical_cols]
encoder = OneHotEncoder(inputCols=[col + "_index" for col in categorical_cols],
                        outputCols=[col + "_encoded" for col in categorical_cols])

# Step 3: Preprocess numerical features
numeric_cols = [t[0] for t in df.dtypes if t[1] in ('int', 'double') and t[0] != "income_class"]
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_features")

# Step 4: Assemble all features
assembler = VectorAssembler(
    inputCols=["scaled_features"] + [col + "_encoded" for col in categorical_cols],
    outputCol="features"
)

# Step 5: Define models
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100, maxDepth=5)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label", maxDepth=5)

# Step 6: Build pipeline
pipeline = Pipeline(stages=[label_indexer] + indexers + [encoder, numeric_assembler, scaler, assembler, dt])

# Step 7: Train and evaluate
train, test = df.randomSplit([0.7, 0.3], seed=42)
model = pipeline.fit(train)
predictions = model.transform(test)

# Metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy")
print(f"Accuracy: {evaluator.evaluate(predictions):.4f}")

# Calculate Precision and Recall using MulticlassClassificationEvaluator
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", metricName="weightedRecall")

print(f"Weighted Precision: {precision_evaluator.evaluate(predictions):.4f}")
print(f"Weighted Recall: {recall_evaluator.evaluate(predictions):.4f}")

Accuracy: 0.8355
Weighted Precision: 0.8277
Weighted Recall: 0.8355
