In [None]:
!pip install PySpark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting PySpark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: PySpark
  Building wheel for PySpark (setup.py) ... [?25l[?25hdone
  Created wheel for PySpark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=1bef64a001c60c76e3a2f79faf9e71ae6f004e7ccba865ee51e2670c05b0e9a2
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built PySpark
Installing collected packages: PySpark
Successfully installed PySpark-3.4.0


In [None]:
# Import necessary libraries
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

# create a spark session
spark = SparkSession.builder.appName("Logistic Regression").getOrCreate()

# Load the data into a DataFrame
train_data_load = spark.read.format("csv").option("header", "false").load("adult.data")
test_data_load= spark.read.format("csv").option("header", "false").load("adult.test")

# Add column names to the DataFrame
columns = ["age", "workclass", "fnlwgt", "education", "education_num", "marital_status",
           "occupation", "relationship", "race", "sex", "capital_gain", "capital_loss",
           "hours_per_week", "native_country", "income"]
train_data_df = train_data_load.toDF(*columns)
test_data_df = test_data_load.toDF(*columns)

# Remove leading and trailing whitespaces from the categorical columns
categorical_columns = ["workclass", "education", "marital_status", "occupation",
                       "relationship", "race", "sex", "native_country", "income"]
for column in categorical_columns:
    train_data = train_data_df.withColumn(column, trim(col(column)))
    test_data = test_data_df.withColumn(column, trim(col(column)))




In [None]:
    numerical_columns = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
    for column in numerical_columns:
      train_data = train_data.withColumn(column, train_data[column].cast("integer"))
      test_data = test_data.withColumn(column, test_data[column].cast("integer"))

train_data_df.show(5)
test_data_df.show(5)

+---+-----------------+-------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|age|        workclass| fnlwgt| education|education_num|     marital_status|        occupation|  relationship|  race|    sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+-----------------+-------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov|  77516| Bachelors|           13|      Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc|  83311| Bachelors|           13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|            13| United-States| <=50K|
| 38|          Private| 215646|   HS-grad|            9

In [None]:
# Add a label column to the data using StringIndexer
indexer = StringIndexer(inputCol="income", outputCol="label")
train_data = indexer.fit(train_data).transform(train_data)
test_data = indexer.fit(test_data).transform(test_data)

# Vectorize the features
categorical_features = ["workclass", "education", "marital_status", "occupation",
                        "relationship", "race", "sex", "native_country"]
numeric_features = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assembler_inputs = [c + "_encoded" for c in categorical_features] + numeric_features
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Convert categorical columns to indices and one-hot encode them
indexers = [StringIndexer(inputCol=c, outputCol=c + "_index") for c in categorical_features]
encoders = [OneHotEncoder(inputCol=c + "_index", outputCol=c + "_encoded") for c in categorical_features]

# Build the logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Chain indexers, encoders, assembler and the model in a Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
accuracy = evaluator.evaluate(predictions)

print("Logistic Regression Classifier Census Income Model Accuracy: ", accuracy)

Logistic Regression Classifier Census Income Model Accuracy:  0.9043771867481067
