# Predicting Salary with Python and Pyspark

In this project, I attempt to predict whether an adult's income exceeds $50,000 per year based upon census data. The data is in the Data folder. You can also download it from the <a href="http://archive.ics.uci.edu/ml/datasets/Adult">UC Irvine Machine Learning Repository</a>.

In [1]:
import pandas as pd

The header is not included, so we defined the names ourselves as follows:

In [2]:
column_names = ['age','workclass','fnlwgt','education','education-num','marital-status','occupation',
                'relationship','race','sex','capital-gain','capital-loss','hours-per-week','native-country',
                'salary']

#### Load and prepare the data

In [3]:
train_df = pd.read_csv('Data/adult.data', names = column_names)
test_df = pd.read_csv('Data/adult.test', names = column_names)

In [4]:
# Drop first row which is not useful and reset the index
test_df = test_df.drop(test_df.index[0])
test_df = test_df.reset_index(drop=True)

In [5]:
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32561 entries, 0 to 32560
Data columns (total 15 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   age             32561 non-null  int64 
 1   workclass       32561 non-null  object
 2   fnlwgt          32561 non-null  int64 
 3   education       32561 non-null  object
 4   education-num   32561 non-null  int64 
 5   marital-status  32561 non-null  object
 6   occupation      32561 non-null  object
 7   relationship    32561 non-null  object
 8   race            32561 non-null  object
 9   sex             32561 non-null  object
 10  capital-gain    32561 non-null  int64 
 11  capital-loss    32561 non-null  int64 
 12  hours-per-week  32561 non-null  int64 
 13  native-country  32561 non-null  object
 14  salary          32561 non-null  object
dtypes: int64(6), object(9)
memory usage: 3.7+ MB


In [6]:
test_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16281 entries, 0 to 16280
Data columns (total 15 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   age             16281 non-null  object 
 1   workclass       16281 non-null  object 
 2   fnlwgt          16281 non-null  float64
 3   education       16281 non-null  object 
 4   education-num   16281 non-null  float64
 5   marital-status  16281 non-null  object 
 6   occupation      16281 non-null  object 
 7   relationship    16281 non-null  object 
 8   race            16281 non-null  object 
 9   sex             16281 non-null  object 
 10  capital-gain    16281 non-null  float64
 11  capital-loss    16281 non-null  float64
 12  hours-per-week  16281 non-null  float64
 13  native-country  16281 non-null  object 
 14  salary          16281 non-null  object 
dtypes: float64(5), object(10)
memory usage: 1.9+ MB


The test data contains columns with data types not matching the training data. We have to make the data types match.

In [7]:
# Change the affected columns to integer type
test_df['age'] = test_df['age'].astype(int)

float_cols = test_df.select_dtypes(include=['float64']).columns
test_df[float_cols] = test_df[float_cols].astype(int)

In [8]:
train_df.dtypes.value_counts()

object    9
int64     6
dtype: int64

In [9]:
test_df.dtypes.value_counts()

object    9
int64     6
dtype: int64

Also, the testing set does not contain Holand-Netherlands which is present in the training set. We will therefore remove it.

In [10]:
train_df = train_df.apply(lambda x: x.astype(str).str.strip() if x.dtype == 'object' else x)

# Make a copy
train_df_cp = train_df.copy()

# Filter out the rows with 'Holand-Netherlands'
train_df = train_df.loc[train_df_cp['native-country'] != 'Holand-Netherlands']

# Save to a CSV file
train_df.to_csv('Data/train.csv', index=False, header=False)

We also save the test data to a CSV file

In [11]:
test_df = test_df.apply(lambda x: x.astype(str).str.strip() if x.dtype == 'object' else x)

test_df.to_csv('Data/test.csv', index=False, header=False)

#### Visualize the data

In [12]:
print('Training data shape: ', train_df.shape)
train_df.head(3)

Training data shape:  (32560, 15)


Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K


In [13]:
print('Testing data shape: ', test_df.shape)
test_df.head(3)

Testing data shape:  (16281, 15)


Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K.
1,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K.
2,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K.


### Check for missing values

In [14]:
train_df.isnull().sum()

age               0
workclass         0
fnlwgt            0
education         0
education-num     0
marital-status    0
occupation        0
relationship      0
race              0
sex               0
capital-gain      0
capital-loss      0
hours-per-week    0
native-country    0
salary            0
dtype: int64

Let's check if the training and testing columns match

In [15]:
train_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

workclass          9
education         16
marital-status     7
occupation        15
relationship       6
race               5
sex                2
native-country    41
salary             2
dtype: int64

In [16]:
test_df.select_dtypes('object').apply(pd.Series.nunique, axis=0)

workclass          9
education         16
marital-status     7
occupation        15
relationship       6
race               5
sex                2
native-country    41
salary             2
dtype: int64

We manually encode the target column to avoid creating two columns during one-hot encoding.

In [17]:
train_df['salary'] = train_df['salary'].apply(lambda x: 0 if x == '<=50K' else 1)
test_df['salary'] = test_df['salary'].apply(lambda x: 0 if x == '<=50K.' else 1)

In [18]:
train_df = pd.get_dummies(train_df)
test_df = pd.get_dummies(test_df)

In [19]:
print('Training features shape: ', train_df.shape)
print('Testing features shape: ', test_df.shape)

Training features shape:  (32560, 108)
Testing features shape:  (16281, 108)


In [20]:
# Keep only features with columns in both train and test
train_df,test_df = train_df.align(test_df, join = 'inner', axis = 1)

In [21]:
print('Training features shape :', train_df.shape)
print('Testing features shape :', test_df.shape)

Training features shape : (32560, 108)
Testing features shape : (16281, 108)


In [22]:
train_df.head(3)

Unnamed: 0,age,fnlwgt,education-num,capital-gain,capital-loss,hours-per-week,salary,workclass_?,workclass_Federal-gov,workclass_Local-gov,...,native-country_Portugal,native-country_Puerto-Rico,native-country_Scotland,native-country_South,native-country_Taiwan,native-country_Thailand,native-country_Trinadad&Tobago,native-country_United-States,native-country_Vietnam,native-country_Yugoslavia
0,39,77516,13,2174,0,40,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
1,50,83311,13,0,0,13,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
2,38,215646,9,0,0,40,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0


### Separate dependent and independent variables

In [23]:
X_train = train_df.drop('salary', axis=1)
y_train = train_df['salary']

X_test = test_df.drop('salary', axis=1)
y_test = test_df['salary']

### Sclaing

In [24]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range = (0,1))

scaler.fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

#### Logistic Regression model

In [25]:
from sklearn.linear_model import LogisticRegression

lr = LogisticRegression(max_iter=1000)
lr.fit(X_train, y_train)

lr_pred = lr.predict(X_test)

#### Score

In [26]:
from sklearn.metrics import accuracy_score

accuracy_score(y_test, lr_pred)

0.8507462686567164

## Using Spark

In [27]:
# Import spark libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [28]:
# Initialize our session

spark = SparkSession.builder.appName('Predict Adult Salary').getOrCreate()

In [29]:
# Create the schema

schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])

In [30]:
# Load the data

train = spark.read.csv('Data/train.csv',header=False, schema=schema)

test = spark.read.csv('Data/test.csv', header=False, schema=schema)

In [31]:
train.head(3)

[Row(age=39, workclass='State-gov', fnlwgt=77516, education='Bachelors', education-num=13, marital-status='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital-gain=2174, capital-loss=0, hours-per-week=40, native-country='United-States', salary='<=50K'),
 Row(age=50, workclass='Self-emp-not-inc', fnlwgt=83311, education='Bachelors', education-num=13, marital-status='Married-civ-spouse', occupation='Exec-managerial', relationship='Husband', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=13, native-country='United-States', salary='<=50K'),
 Row(age=38, workclass='Private', fnlwgt=215646, education='HS-grad', education-num=9, marital-status='Divorced', occupation='Handlers-cleaners', relationship='Not-in-family', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', salary='<=50K')]

In [32]:
train.limit(3).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K


In [33]:
test.limit(3).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K.
1,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K.
2,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K.


In [34]:
train.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education-num',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'salary']

#### Encode the categorical data

In [35]:
cat_cols = ['workclass','education','marital-status','occupation','relationship','race','sex','native-country']

indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in cat_cols]

encoder = OneHotEncoder(
    inputCols=[indexer.getOutputCol() for indexer in indexers], 
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

assembler = VectorAssembler(inputCols=encoder.getOutputCols(),outputCol="cat_features")

In [36]:
pipeline = Pipeline(stages=indexers + [encoder, assembler])

train = pipeline.fit(train).transform(train)

test = pipeline.fit(test).transform(test)

In [37]:
train.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

In [38]:
train.select('cat_features').show(truncate=False)

+---------------------------------------------------------------+
|cat_features                                                   |
+---------------------------------------------------------------+
|(93,[4,10,24,32,44,48,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(93,[1,10,23,31,43,48,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(93,[0,8,25,38,44,48,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(93,[0,13,23,38,43,49,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(93,[0,10,23,29,47,49,62],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|(93,[0,11,23,31,47,48,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|(93,[0,18,28,34,44,49,64],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|(93,[1,8,23,31,43,48,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(93,[0,11,24,29,44,48,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])       |
|(93,[0,10,23,31,43,48,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(93,[0,9,23,31,43,49,52,53],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]) |
|(93,[4,10,23,29,43,50,52,61],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|(93,[0,10

#### We combine our continuous variables with our categorical variables into a single column.

In [39]:
num_cols = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']

assembler = VectorAssembler(inputCols=['cat_features', *num_cols],outputCol='features')

train = assembler.transform(train)

test = assembler.transform(test)

In [40]:
train.limit(5).toPandas()['features'][0]

SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 13.0, 96: 2174.0, 98: 40.0})

In [41]:
test.limit(5).toPandas()['features']

0    (1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
1    (1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...
2    (0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
3    (1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
4    (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
Name: features, dtype: object

#### We encode our target label

In [42]:
indexer = StringIndexer(inputCol='salary', outputCol='label')

train = indexer.fit(train).transform(train)

test = indexer.fit(test).transform(test)

In [43]:
train.select(['features','label']).show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(99,[4,10,24,32,4...|  0.0|
|(99,[1,10,23,31,4...|  0.0|
|(99,[0,8,25,38,44...|  0.0|
|(99,[0,13,23,38,4...|  0.0|
|(99,[0,10,23,29,4...|  0.0|
|(99,[0,11,23,31,4...|  0.0|
|(99,[0,18,28,34,4...|  0.0|
|(99,[1,8,23,31,43...|  1.0|
|(99,[0,11,24,29,4...|  1.0|
|(99,[0,10,23,31,4...|  1.0|
|(99,[0,9,23,31,43...|  1.0|
|(99,[4,10,23,29,4...|  1.0|
|(99,[0,10,24,32,4...|  0.0|
|(99,[0,14,24,33,4...|  0.0|
|(99,[0,12,23,30,4...|  1.0|
|(99,[0,16,23,37,4...|  0.0|
|(99,[1,8,24,39,45...|  0.0|
|(99,[0,8,24,35,46...|  0.0|
|(99,[0,13,23,33,4...|  0.0|
|(99,[1,11,25,31,4...|  1.0|
+--------------------+-----+
only showing top 20 rows



#### Logistic Regression model

In [44]:
# Fit and train the model

lr = LogisticRegression(featuresCol='features', labelCol='label')

model = lr.fit(train)

pred = model.transform(test)

In [45]:
pred.limit(10).toPandas()[['label', 'prediction']]

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,0.0
2,1.0,0.0
3,1.0,1.0
4,0.0,0.0
5,0.0,0.0
6,0.0,0.0
7,1.0,1.0
8,0.0,0.0
9,0.0,0.0


#### Evaluate model

In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

result = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='label')

In [47]:
AUC = result.evaluate(pred)

In [48]:
print("AUC")
print(AUC)

AUC
0.7442772306790945
