In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession;

spark = SparkSession.builder.master("local[4]").appName("ISM6562 Spark App01").getOrCreate();

sc = spark.sparkContext  

spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

Spark Session WebUI Port: 4040


In [2]:
sc.setLogLevel("ERROR")

In [3]:
df = spark.read.csv('Data.csv', header=True, inferSchema=True)

In [4]:
from pyspark.sql.functions import col, regexp_replace
# Replace spaces with underscores in column names
new_col_names = [col_name.strip().replace('-', '_') for col_name in df.columns]
for i, col_name in enumerate(df.columns):
    df = df.withColumnRenamed(col_name, new_col_names[i])

In [5]:
from pyspark.sql.functions import col, when

valueWhenTrue = None  # for example

for c in df.columns:
    df = df.withColumn(
        c,
        when(
            col(c) == ' ?',
            valueWhenTrue
        ).otherwise(col(c))
    )


In [6]:
from pyspark.sql.functions import col, sum
null_counts = df.agg(*[sum(col(c).isNull().cast('int')).alias(c) for c in df.columns])
null_counts.show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|Workclass|fnlwgt|Education|Education_Num|Marital_Status|Occupation|Relationship|Race|Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|     2799|     0|        0|            0|             0|      2809|           0|   0|  0|           0|           0|             0|           857|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [7]:
df = df.dropna()

In [8]:
from pyspark.sql.functions import col, sum
null_counts = df.agg(*[sum(col(c).isNull().cast('int')).alias(c) for c in df.columns])
null_counts.show()

+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|Age|Workclass|fnlwgt|Education|Education_Num|Marital_Status|Occupation|Relationship|Race|Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+
|  0|        0|     0|        0|            0|             0|         0|           0|   0|  0|           0|           0|             0|             0|     0|
+---+---------+------+---------+-------------+--------------+----------+------------+----+---+------------+------------+--------------+--------------+------+



In [1]:
df = df.dropDuplicates()

NameError: name 'df' is not defined

In [10]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Education_Num: integer (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Capital_Gain: integer (nullable = true)
 |-- Capital_Loss: integer (nullable = true)
 |-- Hours_Per_Week: integer (nullable = true)
 |-- Native_Country: string (nullable = true)
 |-- Target: string (nullable = true)



In [11]:
tables = spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [12]:
spark.catalog.listTables()

[]

In [13]:
df.createOrReplaceTempView("income_table")

In [14]:
# df.write.saveAsTable("income_table_per")

In [15]:
tables = spark.sql("show tables").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|         |income_table|       true|
+---------+------------+-----------+



# Exploring The Data

The query you provided calculates the proportion of males in the income_table dataset. It divides the count of males by the count of all records in the dataset, and returns the result as a single column table. From this, we can identify that their is a **disproportunate bias favoring Men** when it comes to earning more than $50,000 per annum when compared to women./

In [16]:
query = """
SELECT 
(COUNT(CASE WHEN TRIM(sex) = 'Male' THEN 1 END) / COUNT(*)) AS male_proportion 
FROM income_table
WHERE TRIM(target)='>50K'
"""
result1 = spark.sql(query)
result1.show(5)

+-----------------+
|  male_proportion|
+-----------------+
|0.851008748437779|
+-----------------+



In [17]:
query2 = """
SELECT 
Education_Num, count(*) AS cn
FROM income_table 
GROUP BY Education_Num 
SORT BY Education_Num
"""
result2 = spark.sql(query2)
result2.show()

+-------------+-----+
|Education_Num|   cn|
+-------------+-----+
|            1|   70|
|            2|  220|
|            3|  447|
|            4|  822|
|            5|  676|
|            6| 1223|
|            7| 1619|
|            8|  575|
|            9|14770|
|           10| 9887|
|           11| 1958|
|           12| 1507|
|           13| 7559|
|           14| 2513|
|           15|  785|
|           16|  544|
+-------------+-----+



In [18]:
query3 = """
SELECT target, COUNT(*) as target_count
FROM income_table 
GROUP BY Target
"""
result3 = spark.sql(query3)
result3.show()

+------+------------+
|target|target_count|
+------+------------+
|  >50K|       11202|
| <=50K|       33973|
+------+------------+



The data has been loaded. Then temporary and persistant tables have been created.  Following this, we have run SQL Queries to explore the data.

In [19]:
df.columns

['Age',
 'Workclass',
 'fnlwgt',
 'Education',
 'Education_Num',
 'Marital_Status',
 'Occupation',
 'Relationship',
 'Race',
 'Sex',
 'Capital_Gain',
 'Capital_Loss',
 'Hours_Per_Week',
 'Native_Country',
 'Target']

In [20]:

df=df.select([
    "Age","Workclass","Education","Marital_Status","Occupation","Relationship","Race","Sex","Capital_Gain","Capital_Loss","Hours_Per_Week","Native_Country","Target"
])

In [21]:
df.select(["Target"]).collect()

[Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' >50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'),
 Row(Target=' <=50K'

In [22]:
from pyspark.sql.functions import when
df=df.withColumn("Target", when(df["Target"]==' <=50K',0).otherwise(1))

In [23]:
df.show()

+---+-----------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
|Age|        Workclass|    Education|     Marital_Status|        Occupation|   Relationship|               Race|    Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|
+---+-----------------+-------------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 29| Self-emp-not-inc|    Bachelors| Married-civ-spouse|             Sales|        Husband|              White|   Male|           0|           0|            70| United-States|     1|
| 36|          Private|    Bachelors| Married-civ-spouse|    Prof-specialty|        Husband|              White|   Male|           0|           0|            45| United-States|     0|
| 22|          Private|      5th-6th| Married-civ-spouse| Machine-op-inspct| Oth

In [24]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Workclass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Capital_Gain: integer (nullable = true)
 |-- Capital_Loss: integer (nullable = true)
 |-- Hours_Per_Week: integer (nullable = true)
 |-- Native_Country: string (nullable = true)
 |-- Target: integer (nullable = false)



# Linear Regression

In [25]:
train_data,test_data=df.randomSplit([0.7,0.3])

In [26]:
from pyspark.ml.feature import StringIndexer
# Use StringIndexer to convert the categorical columns to hold numerical data
 
Workclass_indexer = StringIndexer(inputCol='Workclass',outputCol='Workclass_index',handleInvalid='keep')
Education_indexer = StringIndexer(inputCol='Education',outputCol='Education_index',handleInvalid='keep')
Marital_Status_indexer = StringIndexer(inputCol='Marital_Status',outputCol='Marital_Status_index',handleInvalid='keep')
Occupation_indexer = StringIndexer(inputCol='Occupation',outputCol='Occupation_index',handleInvalid='keep')
Race_indexer = StringIndexer(inputCol='Race',outputCol='Race_index',handleInvalid='keep')
Relationship_indexer = StringIndexer(inputCol='Relationship',outputCol='Relationship_index',handleInvalid='keep')
Sex_indexer = StringIndexer(inputCol='Sex',outputCol='Sex_index',handleInvalid='keep')
Native_Country_indexer = StringIndexer(inputCol='Native_Country',outputCol='Native_Country_index',handleInvalid='keep')

In [27]:
from pyspark.ml.feature import VectorAssembler
# Vector assembler is used to create a vector of input features
 
assembler = VectorAssembler(
    inputCols=[
        "Age",
        "Workclass_index",
        "Education_index",
        "Marital_Status_index",
        "Occupation_index",
        "Relationship_index",
        "Race_index",
        "Sex_index",
        "Capital_Gain",
        "Capital_Loss",
        "Hours_Per_Week",
        "Native_Country_index"
    ],
    outputCol="features"
)

In [28]:
from pyspark.ml import Pipeline

# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data
# https://spark.apache.org/docs/latest/ml-pipeline.html
 
pipe = Pipeline(stages=[
     Workclass_indexer,
Education_indexer,
Marital_Status_indexer,
Occupation_indexer,
Race_indexer,
Relationship_indexer,
Sex_indexer,
Native_Country_indexer,
    assembler
    ]
)

In [29]:
fitted_pipe=pipe.fit(train_data)

In [30]:
train_data=fitted_pipe.transform(train_data)
# train_data.show()

In [31]:
test_data=fitted_pipe.transform(test_data)
test_data.show()

+---+----------+---------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+---------------+---------------+--------------------+----------------+----------+------------------+---------+--------------------+--------------------+
|Age| Workclass|Education|     Marital_Status|        Occupation|   Relationship|               Race|    Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|Workclass_index|Education_index|Marital_Status_index|Occupation_index|Race_index|Relationship_index|Sex_index|Native_Country_index|            features|
+---+----------+---------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+---------------+---------------+--------------------+----------------+----------+------------------+---------+--------------------+--------------------+
| 17| Local-gov|     10th

In [32]:
from pyspark.ml.regression import LinearRegression

lr_model = LinearRegression(labelCol='Target')
fit_model = lr_model.fit(train_data.select(['features','Target']))

In [33]:
%time
results = fit_model.transform(test_data)


CPU times: total: 0 ns
Wall time: 0 ns


In [34]:
results.show()

+---+----------+---------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+---------------+---------------+--------------------+----------------+----------+------------------+---------+--------------------+--------------------+--------------------+
|Age| Workclass|Education|     Marital_Status|        Occupation|   Relationship|               Race|    Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|Workclass_index|Education_index|Marital_Status_index|Occupation_index|Race_index|Relationship_index|Sex_index|Native_Country_index|            features|          prediction|
+---+----------+---------+-------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+---------------+---------------+--------------------+----------------+----------+------------------+---------+--------------------+-----

In [35]:
results.select(['Target','prediction']).show()

+------+--------------------+
|Target|          prediction|
+------+--------------------+
|     0|  0.1176654894164818|
|     0| 0.09811479002336726|
|     0|-0.08419776270823426|
|     0|-0.13387575592209836|
|     0| -0.1291313434690532|
|     0|-0.09614230406536287|
|     0|-0.08778139183652292|
|     0|-0.08360093572210292|
|     0| 0.02091046713839647|
|     0|-0.04432723360264067|
|     0|-0.04014677748822069|
|     0|-0.03596632137380...|
|     0|-0.13629860172563044|
|     0|0.003449598220123397|
|     0| 0.06328530807302835|
|     0|-0.12497876815608278|
|     0|-0.13091772170374927|
|     0|-0.06152103602618...|
|     0|-0.08780927263797247|
|     0|-0.06690699206587261|
+------+--------------------+
only showing top 20 rows



## Evaluate the peformance of the Linear Regression Model

In [36]:
test_results = fit_model.evaluate(test_data)

In [37]:
test_results.residuals.show()



+--------------------+
|           residuals|
+--------------------+
| -0.1176654894164818|
|-0.09811479002336726|
| 0.08419776270823426|
| 0.13387575592209836|
|  0.1291313434690532|
| 0.09614230406536287|
| 0.08778139183652292|
| 0.08360093572210292|
|-0.02091046713839647|
| 0.04432723360264067|
| 0.04014677748822069|
|0.035966321373800715|
| 0.13629860172563044|
|-0.00344959822012...|
|-0.06328530807302835|
| 0.12497876815608278|
| 0.13091772170374927|
|0.061521036026187685|
| 0.08780927263797247|
| 0.06690699206587261|
+--------------------+
only showing top 20 rows



In [38]:
print(f"{'RMSE:':7s} {test_results.rootMeanSquaredError:>7.3f}")
print(f"{'Ex Var:':7s} {test_results.explainedVariance:>7.3f}")
print(f"{'MAE:':7s} {test_results.meanAbsoluteError:>7.3f}")
print(f"{'MSE:':7s} {test_results.meanSquaredError:>7.3f}")
print(f"{'R2:':7s} {test_results.r2:>7.3f}")

RMSE:     0.374
Ex Var:   0.051
MAE:      0.300
MSE:      0.140
R2:       0.255


## Building Logistic Model

In [39]:
train_data,test_data=df.randomSplit([0.7,0.3])

In [40]:
# Import the required libraries
 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer ,OneHotEncoder
from pyspark.ml import Pipeline

In [41]:
data_encoder = OneHotEncoder(
    inputCols=[
        "Workclass_index",
        "Education_index",
        "Marital_Status_index",
        "Occupation_index",
        "Relationship_index",
        "Race_index",
        "Sex_index",
        "Native_Country_index"
    ], 
    outputCols= [
          "Workclass_vec",
        "Education_vec",
        "Marital_Status_vec",
        "Occupation_vec",
        "Relationship_vec",
        "Race_vec",
        "Sex_vec",
        "Native_Country_vec"],
    handleInvalid='keep'
)

In [42]:
assembler = VectorAssembler(
    inputCols=[
       "Age",
        "Workclass_vec",
        "Education_vec",
        "Marital_Status_vec",
        "Occupation_vec",
        "Relationship_vec",
        "Race_vec",
        "Sex_vec",
        "Capital_Gain",
        "Capital_Loss",
        "Hours_Per_Week",
        "Native_Country_vec"
        ],
    outputCol="features"
)

Create an object for the Logistic Regression model

In [43]:
lr_model = LogisticRegression(labelCol='Target')

In [44]:
pipe = Pipeline(
    stages=[
        Workclass_indexer,
Education_indexer,
Marital_Status_indexer,
Occupation_indexer,
Race_indexer,
Relationship_indexer,
Sex_indexer,
Native_Country_indexer,
        data_encoder,
    assembler,
        lr_model
    ]
)

In [45]:
%time
# run the pipeline
fit_model=pipe.fit(train_data)



CPU times: total: 0 ns
Wall time: 0 ns


In [46]:
# Store the results in a dataframe
results = fit_model.transform(test_data)

In [47]:
results.select(['Target','prediction']).show()

+------+----------+
|Target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
+------+----------+
only showing top 20 rows



## Model Evaluation

In [48]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from sklearn.metrics import confusion_matrix
def evaluate(results,Target,model_name):
    

    AUC_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Target',metricName='areaUnderROC')
    AUC = AUC_evaluator.evaluate(results)

    print(f"The area under the curve is {AUC:.2f} for model {model_name}")

    PR_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Target',metricName='areaUnderPR')
    PR = PR_evaluator.evaluate(results)

    print("The area under the PR curve is {} for model {}".format(PR,model_name))



    ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="Target", predictionCol="prediction", metricName="accuracy")

    accuracy = ACC_evaluator.evaluate(results)

    print("The accuracy of the model is {} for model {}".format(accuracy,model_name))



    y_true = results.select("Target")
    y_true = y_true.toPandas()
 
    y_pred = results.select("prediction")
    y_pred = y_pred.toPandas()
 
#     cnf_matrix = confusion_matrix(y_true, y_pred)
#     print("Below is the confusion matrix: \n {}".format(cnf_matrix))

#     tn = cnf_matrix[0][0]
#     fp = cnf_matrix[0][1]
#     fn = cnf_matrix[1][0]
#     tp = cnf_matrix[1][1]

#     accuracy = (tp+tn)/(tp+tn+fp+fn)
#     precision = tp/(tp+fp)
#     recall = tp/(tp+fn)
#     f1_score = 2*(precision*recall)/(precision+recall)

#     print(f"Accuracy: {accuracy:.2f} for model {model_name}")
#     print(f"Precision: {precision:.2f} for model {model_name}")
#     print(f"Recall: {recall:.2f} for model {model_name}")
#     print(f"F1 Score: {f1_score:.2f} for model {model_name}")

In [49]:
evaluate(results,"Target","Logistic Model")

The area under the curve is 0.77 for model Logistic Model
The area under the PR curve is 0.6379504223888752 for model Logistic Model
The accuracy of the model is 0.8509413067552603 for model Logistic Model


### Building the Linear SVC model

In [50]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import StandardScaler

In [51]:
assembler = VectorAssembler(
    inputCols=[
        "Age",
        "Workclass_index",
        "Education_index",
        "Marital_Status_index",
        "Occupation_index",
        "Relationship_index",
        "Race_index",
        "Sex_index",
        "Capital_Gain",
        "Capital_Loss",
        "Hours_Per_Week",
        "Native_Country_index"
    ],
    outputCol="unscaled_features"
)

Standard scaler is used to scale the data for the linear SVC to perform well on the training data

In [52]:
scaler = StandardScaler(inputCol="unscaled_features",outputCol="features")

Create an object for the Linear SVC model

In [53]:
svc_model = LinearSVC(labelCol='Target')

Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data in the same way as that of the train data.

In [54]:
pipe = Pipeline(
    stages=[
        Workclass_indexer,
Education_indexer,
Marital_Status_indexer,
Occupation_indexer,
Race_indexer,
Relationship_indexer,
Sex_indexer,
Native_Country_indexer,
    assembler,
        scaler,svc_model
    ]
)

Fit the model on the training data

In [55]:
train_data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Workclass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Capital_Gain: integer (nullable = true)
 |-- Capital_Loss: integer (nullable = true)
 |-- Hours_Per_Week: integer (nullable = true)
 |-- Native_Country: string (nullable = true)
 |-- Target: integer (nullable = false)



In [56]:
test_data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Workclass: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Relationship: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Capital_Gain: integer (nullable = true)
 |-- Capital_Loss: integer (nullable = true)
 |-- Hours_Per_Week: integer (nullable = true)
 |-- Native_Country: string (nullable = true)
 |-- Target: integer (nullable = false)



In [57]:
%time

fit_model=pipe.fit(train_data)

CPU times: total: 0 ns
Wall time: 0 ns


Store the results in a dataframe

In [58]:
results = fit_model.transform(test_data)
display(results)

DataFrame[Age: int, Workclass: string, Education: string, Marital_Status: string, Occupation: string, Relationship: string, Race: string, Sex: string, Capital_Gain: int, Capital_Loss: int, Hours_Per_Week: int, Native_Country: string, Target: int, Workclass_index: double, Education_index: double, Marital_Status_index: double, Occupation_index: double, Race_index: double, Relationship_index: double, Sex_index: double, Native_Country_index: double, unscaled_features: vector, features: vector, rawPrediction: vector, prediction: double]

In [59]:
results.select(['Target','prediction']).show()

+------+----------+
|Target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
+------+----------+
only showing top 20 rows



In [60]:
evaluate(results,"Target","Linear SVC")

The area under the curve is 0.64 for model Linear SVC
The area under the PR curve is 0.5765284595589857 for model Linear SVC
The accuracy of the model is 0.8060538944259874 for model Linear SVC


# Linear MLP model

In [61]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

mlp_model = MultilayerPerceptronClassifier().\
        setLabelCol("Target").\
        setFeaturesCol("features").\
        setSeed(20).\
        setLayers([12, 16, 16, 8]) # the 6 is the number of input columns - we need the first layer to be the same as the number of input columns




In [62]:
pipe = Pipeline(
    stages=[
        Workclass_indexer,
Education_indexer,
Marital_Status_indexer,
Occupation_indexer,
Race_indexer,
Relationship_indexer,
Sex_indexer,
Native_Country_indexer,
    assembler,
        scaler,mlp_model
    ]
)

In [63]:
%time

fit_model=pipe.fit(train_data)

CPU times: total: 0 ns
Wall time: 0 ns


In [64]:
test_data.show(3)

+---+----------+---------+-------------------+----------------+------------+------+-------+------------+------------+--------------+--------------+------+
|Age| Workclass|Education|     Marital_Status|      Occupation|Relationship|  Race|    Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|
+---+----------+---------+-------------------+----------------+------------+------+-------+------------+------------+--------------+--------------+------+
| 17| Local-gov|     10th|      Never-married| Protective-serv|   Own-child| White| Female|           0|        1602|            40| United-States|     0|
| 17| Local-gov|      9th|      Never-married|   Other-service|   Own-child| White|   Male|           0|           0|            45| United-States|     0|
| 17|   Private|     10th| Married-civ-spouse|           Sales|   Own-child| White| Female|           0|           0|            30| United-States|     0|
+---+----------+---------+-------------------+----------------+-------

In [65]:
train_data.show(3)

+---+----------+---------+--------------+--------------+------------+------+-------+------------+------------+--------------+--------------+------+
|Age| Workclass|Education|Marital_Status|    Occupation|Relationship|  Race|    Sex|Capital_Gain|Capital_Loss|Hours_Per_Week|Native_Country|Target|
+---+----------+---------+--------------+--------------+------------+------+-------+------------+------------+--------------+--------------+------+
| 17| Local-gov|     10th| Never-married| Other-service|   Own-child| White| Female|           0|           0|            25| United-States|     0|
| 17| Local-gov|     11th| Never-married|  Adm-clerical|   Own-child| White| Female|           0|           0|            15| United-States|     0|
| 17| Local-gov|     11th| Never-married| Other-service|   Own-child| White| Female|           0|           0|            16| United-States|     0|
+---+----------+---------+--------------+--------------+------------+------+-------+------------+------------+--

In [66]:
results = fit_model.transform(test_data)
display(results)


DataFrame[Age: int, Workclass: string, Education: string, Marital_Status: string, Occupation: string, Relationship: string, Race: string, Sex: string, Capital_Gain: int, Capital_Loss: int, Hours_Per_Week: int, Native_Country: string, Target: int, Workclass_index: double, Education_index: double, Marital_Status_index: double, Occupation_index: double, Race_index: double, Relationship_index: double, Sex_index: double, Native_Country_index: double, unscaled_features: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [67]:
results.select(['Target','prediction']).show()

+------+----------+
|Target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
+------+----------+
only showing top 20 rows



In [68]:
evaluate(results,"Target","Linear MLP")

The area under the curve is 0.76 for model Linear MLP
The area under the PR curve is 0.6249578337485754 for model Linear MLP
The accuracy of the model is 0.8451827242524917 for model Linear MLP


# Decision Tree

In [69]:
from pyspark.ml.classification import DecisionTreeClassifier


In [70]:
assembler = VectorAssembler(
    inputCols=[
        "Age",
        "Workclass_index",
        "Education_index",
        "Marital_Status_index",
        "Occupation_index",
        "Relationship_index",
        "Race_index",
        "Sex_index",
        "Capital_Gain",
        "Capital_Loss",
        "Hours_Per_Week",
        "Native_Country_index"
    ],
    outputCol="features"
)

In [71]:
dt_model = DecisionTreeClassifier(labelCol='Target',maxBins=5000)

In [72]:
pipe = Pipeline(
    stages=[
        Workclass_indexer,
Education_indexer,
Marital_Status_indexer,
Occupation_indexer,
Race_indexer,
Relationship_indexer,
Sex_indexer,
Native_Country_indexer,
    assembler,
        dt_model
    ]
)

In [73]:
fit_model=pipe.fit(train_data)

In [74]:
results = fit_model.transform(test_data)

In [75]:
results.select(['Target','prediction']).show()

+------+----------+
|Target|prediction|
+------+----------+
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
|     0|       0.0|
+------+----------+
only showing top 20 rows



In [76]:
evaluate(results,"Target","Decision Tree")

The area under the curve is 0.75 for model Decision Tree
The area under the PR curve is 0.6458022025145571 for model Decision Tree
The accuracy of the model is 0.8485049833887043 for model Decision Tree
