Importing the libraries and Creating SparkSession

In [1]:
import pandas as pd
from pyspark.sql.functions import col, sum
import pyspark.sql.functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Income Prediction").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "10g").getOrCreate()

Reading my data and putting it into a dataframe

In [2]:
df1 = spark.read.csv(
    "adult test.csv",
    header=True,
    inferSchema=True,
    sep=",",
    quote="\"",
    escape="\""
)
df1.show(5, truncate=False)
df1.printSchema()

+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+
|Age|workclass |fnlwgt|education    |education-num|marital-status     |occupation        |relationship|race  |sex    |capital-gain|capital-loss |hours-per-week|native-country|label  |
+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+
|25 | Private  |226802| 11th        |7            | Never-married     | Machine-op-inspct| Own-child  | Black| Male  |0           |0            |40            | United-States| <=50K.|
|38 | Private  |89814 | HS-grad     |9            | Married-civ-spouse| Farming-fishing  | Husband    | White| Male  |0           |0            |50            | United-States| <=50K.|
|28 | Local-gov|336951| Assoc-acdm  |12           | Married-civ-spouse| Protecti

In [3]:
#from pyspark.sql.types import StructType, StructField, IntegerType, StringType

#schema = StructType([
 #   StructField("age", IntegerType(), True),
 #   StructField("workclass", StringType(), True),
 #   StructField("fnlwgt", IntegerType(), True),
 #   StructField("education", StringType(), True),
 #   StructField("education_num", IntegerType(), True),
 #   StructField("marital_status", StringType(), True),
 #   StructField("occupation", StringType(), True),
 #   StructField("relationship", StringType(), True),
 #   StructField("race", StringType(), True),
 #   StructField("sex", StringType(), True),
 #   StructField("capital_gain", IntegerType(), True),
 #   StructField("capital_loss", IntegerType(), True),
 #   StructField("hours_per_week", IntegerType(), True),
 #   StructField("native_country", StringType(), True),
 #   StructField("income", StringType(), True)
#])

#df = spark.read.csv(
 #   "adult test.csv",
  #  header=True,
 #   #schema=schema,
  #  sep=",",
  #  quote="\"",
  #  escape="\""
#)
#pandas_df = df.toPandas()
#pandas_df

In [4]:
df2 = spark.read.csv(
    "adult train.csv",
    header=True,
    inferSchema=True,
    sep=",",
    quote="\"",
    escape="\""
)
df2.show(5, truncate=False)
df2.printSchema()

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|Age|workclass        |fnlwgt|education |education-num|marital-status     |occupation        |relationship  |race  |sex    |capital-gain|capital-loss|hours-per-week|native-country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40            | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13            | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divo

EDA

In [5]:
df1.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+-------+
|summary|               Age|   workclass|            fnlwgt|    education|    education-num|marital-status|       occupation|relationship|               race|    sex|      capital-gain|     capital-loss |    hours-per-week|native-country|  label|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+------------+-------------------+-------+------------------+------------------+------------------+--------------+-------+
|  count|             16281|       16281|             16281|        16281|            16281|         16281|            16281|       16281|              16281|  16281|             16281|             16281|             16281|         16281|  16281|
|   mean| 38

In [6]:
df2.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+-------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|summary|               Age|   workclass|            fnlwgt|    education|    education-num|marital-status|       occupation|relationship |               race|    sex|      capital-gain|    capital-loss|    hours-per-week|native-country| label|
+-------+------------------+------------+------------------+-------------+-----------------+--------------+-----------------+-------------+-------------------+-------+------------------+----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|         32561|            32561|        32561|              32561|  32561|             32561|           32561|             32561|         32561| 32561|
|   mean| 38.5816467

Data Preprocessing

Checking for missing values in dataframe 1 (adult test) & dataframe 2 (adult train)

In [7]:
df1.select([(F.sum(F.col(c).isNull().cast("int")).alias(c)) for c in df1.columns]).show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+-------------+--------------+--------------+-----+
|Age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship|race|sex|capital-gain|capital-loss |hours-per-week|native-country|label|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+-------------+--------------+--------------+-----+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|  0|           0|            0|             0|             0|    0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+-------------+--------------+--------------+-----+



We can see from here that there are no missing values in df1 

In [8]:
df2.select([(F.sum(F.col(c).isNull().cast("int")).alias(c)) for c in df2.columns]).show()

+---+---------+------+---------+-------------+--------------+----------+-------------+----+---+------------+------------+--------------+--------------+-----+
|Age|workclass|fnlwgt|education|education-num|marital-status|occupation|relationship |race|sex|capital-gain|capital-loss|hours-per-week|native-country|label|
+---+---------+------+---------+-------------+--------------+----------+-------------+----+---+------------+------------+--------------+--------------+-----+
|  0|        0|     0|        0|            0|             0|         0|            0|   0|  0|           0|           0|             0|             0|    0|
+---+---------+------+---------+-------------+--------------+----------+-------------+----+---+------------+------------+--------------+--------------+-----+



We can see from here that there are no missing values in df2 as well

Encoding categorical variables in both data frames

In [9]:
data = df1

#list of categorical columns to encode
categorical_columns = ["workclass", "education", "marital-status", "occupation", "relationship", "race", "sex", "native-country", "label"]

#Creating StringIndexers for each categorical column
indexers  = [
    StringIndexer(inputCol = col, outputCol = f"{col}_index")
    for col in categorical_columns
]

#Creating OneHotEncoders for each indexed columns
encoders = [
    OneHotEncoder(inputCol = f"{col}_index", outputCol = f"{col}_encoded")
    for col in categorical_columns
]

#Combining indexers and encoders to a pipeline
pipeline = Pipeline(stages = indexers + encoders)

#Fiting the pipeline to the data and transform it 
pipeline_model = pipeline.fit(data)
transformed_data = pipeline_model.transform(data)

#Showing the transformed dataframe
transformed_data.show(5, 0)

+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+---------------+---------------+--------------------+----------------+------------------+----------+---------+--------------------+-----------+-----------------+-----------------+----------------------+------------------+--------------------+-------------+-------------+----------------------+-------------+
|Age|workclass |fnlwgt|education    |education-num|marital-status     |occupation        |relationship|race  |sex    |capital-gain|capital-loss |hours-per-week|native-country|label  |workclass_index|education_index|marital-status_index|occupation_index|relationship_index|race_index|sex_index|native-country_index|label_index|workclass_encoded|education_encoded|marital-status_encoded|occupation_encoded|relationship_encoded|race_encoded |sex_encoded  |native-country_encoded|label_encoded|
+---+----------+--

In [10]:
data1 = df2

#list of categorical columns to encode
categorical_columns = ["workclass", "education", "marital-status", "occupation", "relationship ", "race", "sex", "native-country", "label"]

#Creating StringIndexers for each categorical column
indexers  = [
    StringIndexer(inputCol = col, outputCol = f"{col}_index")
    for col in categorical_columns
]

#Creating OneHotEncoders for each indexed columns
encoders = [
    OneHotEncoder(inputCol = f"{col}_index", outputCol = f"{col}_encoded")
    for col in categorical_columns
]

#Combining indexers and encoders to a pipeline
pipeline = Pipeline(stages = indexers + encoders)

#Fiting the pipeline to the data and transform it 
pipeline_model = pipeline.fit(data1)
#transformed_data = pipeline_model.transform(data1)

#Showing the transformed dataframe
transformed_data.show(5, 0)

+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+---------------+---------------+--------------------+----------------+------------------+----------+---------+--------------------+-----------+-----------------+-----------------+----------------------+------------------+--------------------+-------------+-------------+----------------------+-------------+
|Age|workclass |fnlwgt|education    |education-num|marital-status     |occupation        |relationship|race  |sex    |capital-gain|capital-loss |hours-per-week|native-country|label  |workclass_index|education_index|marital-status_index|occupation_index|relationship_index|race_index|sex_index|native-country_index|label_index|workclass_encoded|education_encoded|marital-status_encoded|occupation_encoded|relationship_encoded|race_encoded |sex_encoded  |native-country_encoded|label_encoded|
+---+----------+--

Scaling our numerical features in both dataframes to a common scale 

In [11]:
#list of numerical columns to scale
numerical_columns = ["Age", "fnlwgt", "education-num", "capital-gain","hours-per-week"]

#Assembling numerical columns into a feature vector
assembler = VectorAssembler(inputCols = numerical_columns, outputCol = "numerical_features")
df_vector = assembler.transform(data)

#Initialize the StandardScaler
scaler = StandardScaler(inputCol = "numerical_features", outputCol = "scaled_features", withMean = True, withStd  = True)

#Fit the scaler to the first dataframe and transform 
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

#Converting vector to an array column
df_final = df_scaled.withColumn("scaled_features_array", vector_to_array("scaled_features"))
for i, col_name in enumerate(numerical_columns):
    df_final = df_final.withColumn(f"scaled_{col_name}", df_final["scaled_features_array"][i])

#Drop intermediate columns
df_final = df_final.drop("numerical features", "scaled_features", "scaled_features_array")

In [12]:
df_final1.show(5, 0)

+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+--------------------------------+--------------------+--------------------+---------------------+--------------------+---------------------+
|Age|workclass |fnlwgt|education    |education-num|marital-status     |occupation        |relationship|race  |sex    |capital-gain|capital-loss |hours-per-week|native-country|label  |numerical_features              |scaled_Age          |scaled_fnlwgt       |scaled_education-num |scaled_capital-gain |scaled_hours-per-week|
+---+----------+------+-------------+-------------+-------------------+------------------+------------+------+-------+------------+-------------+--------------+--------------+-------+--------------------------------+--------------------+--------------------+---------------------+--------------------+---------------------+
|25 | Private  |226802| 11th

In [15]:
# List of numerical columns to scale
numerical_columns = ["Age", "fnlwgt", "education-num", "capital-gain", "hours-per-week"]

# Assembling numerical columns into a feature vector for data1
assembler = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")
df_vector1 = assembler.transform(data1)

# Initialize the StandardScaler (reusing the same scaler as before)
scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features", withMean=True, withStd=True)

# Fit the scaler to data1 and transform
scaler_model1 = scaler.fit(df_vector1)
df_scaled1 = scaler_model1.transform(df_vector1)

# Converting vector to an array column for data1
df_final1 = df_scaled1.withColumn("scaled_features_array", vector_to_array("scaled_features"))
for i, col_name in enumerate(numerical_columns):
    df_final1 = df_final1.withColumn(f"scaled_{col_name}", df_final1["scaled_features_array"][i])

# Drop intermediate columns for data1
df_final1 = df_final1.drop("numerical_features", "scaled_features", "scaled_features_array")

In [16]:
df_final1.show(5, 0)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+--------------------+-------------------+--------------------+--------------------+---------------------+
|Age|workclass        |fnlwgt|education |education-num|marital-status     |occupation        |relationship  |race  |sex    |capital-gain|capital-loss|hours-per-week|native-country|label |scaled_Age          |scaled_fnlwgt      |scaled_education-num|scaled_capital-gain |scaled_hours-per-week|
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+--------------+--------------+------+--------------------+-------------------+--------------------+--------------------+---------------------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male