<a href="https://colab.research.google.com/github/Shilpa393/Data-for-End-Term/blob/main/DAS_End_Term.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Problem Statement:
The task is to predict whether an individual's income exceeds $50,000 per year based on various demographic and employment attributes. This is a classification problem where the target variable is the 'income' column, which has two classes: <=50K and >50K.

Predicting Income Level:

Objective: Predict whether an individual earns more than $50,000 per year (>50K)  or less than or equal to $50,000 per year (<=50K).

Target Variable: income (categorical with two classes: <=50K and >50K).

Features: A mix of demographic and employment-related attributes such as age, workclass, education, marital_status, occupation, race, sex, capital-gain, capital-loss, hours-per-week, and native-country.

#Install Spark, current version

In [None]:
#import findspark
#findspark.init()
!pip3 install -q pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Income_Prediction').master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


#Load Dataset

In [None]:
inc_df=spark.read.csv('/content/drive/MyDrive/Adult Income Dataset.csv',inferSchema=True,header=True)
inc_df.show(10)

+---+-----------------+------+----------+-------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|Age|        workclass|fnlwgt| education|education-num|      marital_status|        occupation|  relationship|  race|    sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+-----------------+------+----------+-------------+--------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516| Bachelors|           13|       Never-married|      Adm-clerical| Not-in-family| White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311| Bachelors|           13|  Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|           0|           0|            13| United-States| <=50K|
| 38|          Private|215646|   HS-grad|            9|

#Data Preprocessing

## Dropping Columns

In [None]:
# Drop fnlwgt and education-num columns
inc_df = inc_df.drop("fnlwgt", "education-num")

##Handling missing values, empty strings and white spaces

In [None]:
from pyspark.sql.functions import col, sum, when

In [None]:
# Check for missing values in each column
missing_values = inc_df.select([(sum(col(c).isNull().cast("int")).alias(c)) for c in inc_df.columns])

In [None]:
# Show columns with missing values
missing_values.show()

+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|        0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [None]:
# Count empty strings in each column
empty_string_counts = inc_df.select([(sum(when(col(c) == '', 1).otherwise(0)).alias(c)) for c in inc_df.columns])

In [None]:
empty_string_counts.show()

+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|        0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [None]:
# Count whitespaces in each column
whitespace_counts = inc_df.select([(sum(when(col(c).contains(' '), 1).otherwise(0)).alias(c)) for c in inc_df.columns])

In [None]:
whitespace_counts.show()

+---+---------+---------+--------------+----------+------------+-----+-----+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship| race|  sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+-----+-----+------------+------------+--------------+--------------+------+
|  0|    32561|    32561|         32561|     32561|       32561|32561|32561|           0|           0|             0|         32561| 32561|
+---+---------+---------+--------------+----------+------------+-----+-----+------------+------------+--------------+--------------+------+



In [None]:
#Trim whitespaces
from pyspark.sql.functions import trim

In [None]:
# Trim whitespaces from string columns
inc_df_trim = inc_df.select([trim(col(c)).alias(c) if dict(inc_df.dtypes)[c] == 'string' else col(c) for c in inc_df.columns])

In [None]:
# Show trimmed data
inc_df_trim.show(5)

+---+----------------+---------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|Age|       workclass|education|    marital_status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+----------------+---------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov|Bachelors|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc|Bachelors|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|  HS-grad|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|           0|           0|            40| United-States| <=50K|
| 53|         Private|     11th|Married-

In [None]:
# Replace whitespace-only strings with NULL
inc_df_final = inc_df_trim.select([when(trim(col(c)) == '', None).otherwise(col(c)).alias(c) if dict(inc_df_trim.dtypes)[c] == 'string' else col(c) for c in inc_df_trim.columns])

In [None]:
# Show cleaned data
inc_df_final.show(5)

+---+----------------+---------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|Age|       workclass|education|    marital_status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+----------------+---------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov|Bachelors|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc|Bachelors|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|  HS-grad|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|           0|           0|            40| United-States| <=50K|
| 53|         Private|     11th|Married-

###Verifying Changes





In [None]:
# Check for missing values in each column
missing_values = inc_df_final.select([(sum(col(c).isNull().cast("int")).alias(c)) for c in inc_df_final.columns])
missing_values.show()

+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|        0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [None]:
# Count empty strings in each column
empty_string_counts = inc_df_final.select([(sum(when(col(c) == '', 1).otherwise(0)).alias(c)) for c in inc_df_final.columns])
empty_string_counts.show()

+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|        0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [None]:
# Count whitespaces in each column
whitespace_counts = inc_df_final.select([(sum(when(col(c).contains(' '), 1).otherwise(0)).alias(c)) for c in inc_df_final.columns])
whitespace_counts.show()

+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|workclass|education|marital_status|occupation|relationship|race|sex|capital-gain|capital-loss|hours-per-week|native_country|income|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|        0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+---------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



## Encoding Categorical Variables

In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, ChiSqSelector

In [None]:
# Define categorical and numerical columns
categorical_columns = ['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country']
numerical_cols = ['Age', 'capital-gain', 'capital-loss', 'hours-per-week']
label_col = 'income'

In [None]:
# Define the label indexer
label_indexer = StringIndexer(inputCol=label_col, outputCol="label_index")

In [None]:
label_indexer

StringIndexer_3b310bd71ac3

In [None]:
# Indexers
indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in categorical_columns]

In [None]:
indexers

[StringIndexer_d8cb09ac274c,
 StringIndexer_3eb269f8eb8d,
 StringIndexer_29807b604de9,
 StringIndexer_76a044f3cf8c,
 StringIndexer_4a82d341c53f,
 StringIndexer_15b0803a4cbf,
 StringIndexer_76fe1d6824bb,
 StringIndexer_a236ab621bca]

In [None]:
# Encoders
encoders = [OneHotEncoder(inputCol=c+"_index", outputCol=c+"_ohe") for c in categorical_columns]

In [None]:
encoders

[OneHotEncoder_85dcdafa2891,
 OneHotEncoder_11de27cd7472,
 OneHotEncoder_7f59b22ad9ac,
 OneHotEncoder_4e6ecd8a6f14,
 OneHotEncoder_ab89509a9104,
 OneHotEncoder_c6dca7ed7139,
 OneHotEncoder_c4e0e50807a6,
 OneHotEncoder_1389d5615efc]

In [None]:
# Assemble features
assembler = VectorAssembler(inputCols=[c+"_ohe" for c in categorical_columns] + numerical_cols, outputCol="features")

In [None]:
assembler

VectorAssembler_389c9ad357d7

##Feature Scaling

In [None]:
# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [None]:
scaler

StandardScaler_afe4fc947f6f

##Creating Pipeline

In [None]:
from pyspark.ml import Pipeline

In [None]:
# Create a pipeline with all stages
pipeline = Pipeline(stages=[label_indexer] + indexers + encoders + [assembler, scaler])

In [None]:
# Fit and transform data
pipeline_model = pipeline.fit(inc_df_final)
processed_data = pipeline_model.transform(inc_df_final)

In [None]:
# Show the final data with features
processed_data.select("scaled_features", "income").show(5)

+--------------------+------+
|     scaled_features|income|
+--------------------+------+
|(98,[4,10,24,32,4...| <=50K|
|(98,[1,10,23,31,4...| <=50K|
|(98,[0,8,25,38,44...| <=50K|
|(98,[0,13,23,38,4...| <=50K|
|(98,[0,10,23,29,4...| <=50K|
+--------------------+------+
only showing top 5 rows



##Train and Test Split

In [None]:
# Split data into training and test sets
train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=1234)

In [None]:
# Show the processed data
processed_data.select("scaled_features", "label_index").show(5, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|scaled_features                                                                                                                                                                                                                                        |label_index|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+
|(98,[4,10,24,32,44,48,52,53,94,95,97],[5.1113826411478325,2.697608869138031,2.1298089222493997,3.1253021836524555,2.2940963550568068,2.8341681816681885,2.125369466818238,3.273851346433711,2.8591468669928934,0.2943

# Train and Evaluate Models

In [None]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Logistic Regression

In [None]:
lr = LogisticRegression(labelCol="label_index", featuresCol="scaled_features")
lr_model = lr.fit(train_data)
lr_predictions = lr_model.transform(test_data)
lr_evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
lr_accuracy = lr_evaluator.evaluate(lr_predictions)
print(f"Logistic Regression Accuracy: {lr_accuracy}")

Logistic Regression Accuracy: 0.8496125611745514


In [None]:
lr_predictions.show(5)

+---+---------+---------+--------------+----------+--------------+-----+------+------------+------------+--------------+--------------+------+-----------+---------------+---------------+--------------------+----------------+------------------+----------+---------+--------------------+-------------+--------------+------------------+--------------+----------------+-------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Age|workclass|education|marital_status|occupation|  relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native_country|income|label_index|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|native_country_index|workclass_ohe| education_ohe|marital_status_ohe|occupation_ohe|relationship_ohe|     race_ohe|      sex_ohe|native_country_ohe|            features|     scaled_features|       rawPrediction|         probabil

### Decision Tree Classifier

In [None]:
dt = DecisionTreeClassifier(labelCol="label_index", featuresCol="scaled_features")
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy}")

Decision Tree Accuracy: 0.8363580750407831


In [None]:
dt_predictions.show(5)

+---+---------+---------+--------------+----------+--------------+-----+------+------------+------------+--------------+--------------+------+-----------+---------------+---------------+--------------------+----------------+------------------+----------+---------+--------------------+-------------+--------------+------------------+--------------+----------------+-------------+-------------+------------------+--------------------+--------------------+--------------+--------------------+----------+
|Age|workclass|education|marital_status|occupation|  relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native_country|income|label_index|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|native_country_index|workclass_ohe| education_ohe|marital_status_ohe|occupation_ohe|relationship_ohe|     race_ohe|      sex_ohe|native_country_ohe|            features|     scaled_features| rawPrediction|         probability|predicti

### Random Forest Classifier

In [None]:
rf = RandomForestClassifier(labelCol="label_index", featuresCol="scaled_features")
rf_model = rf.fit(train_data)
rf_predictions = rf_model.transform(test_data)
rf_evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy}")

Random Forest Accuracy: 0.8027120717781403


In [None]:
rf_predictions.show(5)

+---+---------+---------+--------------+----------+--------------+-----+------+------------+------------+--------------+--------------+------+-----------+---------------+---------------+--------------------+----------------+------------------+----------+---------+--------------------+-------------+--------------+------------------+--------------+----------------+-------------+-------------+------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Age|workclass|education|marital_status|occupation|  relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native_country|income|label_index|workclass_index|education_index|marital_status_index|occupation_index|relationship_index|race_index|sex_index|native_country_index|workclass_ohe| education_ohe|marital_status_ohe|occupation_ohe|relationship_ohe|     race_ohe|      sex_ohe|native_country_ohe|            features|     scaled_features|       rawPrediction|         probabil