In [1]:
# problem provided by https://archive.ics.uci.edu/ml/datasets/Adult

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("salary_50").getOrCreate()
spark_session

In [3]:
# reading the data and giving the names to the fields
data = spark_session.read.csv("adult.data", inferSchema = True ,sep=",")
data = data.withColumnRenamed("_c0" ,"age")
data = data.withColumnRenamed("_c1" ,"workclass")
data = data.withColumnRenamed("_c2" ,"fnlwgt")
data = data.withColumnRenamed("_c3" ,"education")
data = data.withColumnRenamed("_c4" ,"education_num")
data = data.withColumnRenamed("_c5" ,"marital_status")
data = data.withColumnRenamed("_c6" ,"occupation")
data = data.withColumnRenamed("_c7" ,"relationship")
data = data.withColumnRenamed("_c8" ,"race")
data = data.withColumnRenamed("_c9" ,"sex")
data = data.withColumnRenamed("_c10" ,"capital_gain")
data = data.withColumnRenamed("_c11" ,"capital_loss")
data = data.withColumnRenamed("_c12" ,"hours_per_week")
data = data.withColumnRenamed("_c13" ,"native_country")
data = data.withColumnRenamed("_c14" ,"label")

data.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [4]:
data.show(100)
#certain columns have a null value

+---+-----------------+--------+-------------+-------------+--------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|  fnlwgt|    education|education_num|      marital_status|        occupation|   relationship|               race|    sex|capital_gain|capital_loss|hours_per_week|native_country| label|
+---+-----------------+--------+-------------+-------------+--------------------+------------------+---------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov| 77516.0|    Bachelors|         13.0|       Never-married|      Adm-clerical|  Not-in-family|              White|   Male|      2174.0|         0.0|          40.0| United-States| <=50K|
| 50| Self-emp-not-inc| 83311.0|    Bachelors|         13.0|  Married-civ-spouse|   Exec-managerial|        Husband|              White|   Male|         0.0|         0.

In [5]:
# getting that columns who contains null values
for i in range(15):
    if data.groupBy(data.columns[i]).count().filter(f"{data.columns[i]} is null").count() > 0:
        print(data.columns[i])

workclass
occupation
native_country


In [6]:
# droping all the row who contain null values
print(data.count())
data = data.na.drop()
print(data.count())

32561
30162


In [7]:
# indexing the categorical columns
from pyspark.ml.feature import StringIndexer
string_indxer = StringIndexer(inputCol = "workclass" , outputCol = "workclass_indexed")
indexed_data = string_indxer.fit(data).transform(data)
string_indxer = StringIndexer(inputCol = "education" , outputCol = "education_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "marital_status" , outputCol = "marital_status_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "occupation" , outputCol = "occupation_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "relationship" , outputCol = "relationship_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "race" , outputCol = "race_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "sex" , outputCol = "sex_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "native_country" , outputCol = "native_country_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)
string_indxer = StringIndexer(inputCol = "label" , outputCol = "label_indexed")
indexed_data = string_indxer.fit(indexed_data).transform(indexed_data)

In [8]:
indexed_data.select(indexed_data.columns[15:19]).show()

+-----------------+-----------------+----------------------+------------------+
|workclass_indexed|education_indexed|marital_status_indexed|occupation_indexed|
+-----------------+-----------------+----------------------+------------------+
|              3.0|              2.0|                   1.0|               3.0|
|              1.0|              2.0|                   0.0|               2.0|
|              0.0|              0.0|                   2.0|               8.0|
|              0.0|              5.0|                   0.0|               8.0|
|              0.0|              2.0|                   0.0|               0.0|
|              0.0|              3.0|                   0.0|               2.0|
|              0.0|             10.0|                   5.0|               5.0|
|              1.0|              0.0|                   0.0|               2.0|
|              0.0|              3.0|                   1.0|               0.0|
|              0.0|              2.0|   

In [9]:
indexed_data.select(indexed_data.columns[18:]).show()

+------------------+--------------------+------------+-----------+----------------------+-------------+
|occupation_indexed|relationship_indexed|race_indexed|sex_indexed|native_country_indexed|label_indexed|
+------------------+--------------------+------------+-----------+----------------------+-------------+
|               3.0|                 1.0|         0.0|        0.0|                   0.0|          0.0|
|               2.0|                 0.0|         0.0|        0.0|                   0.0|          0.0|
|               8.0|                 1.0|         0.0|        0.0|                   0.0|          0.0|
|               8.0|                 0.0|         1.0|        0.0|                   0.0|          0.0|
|               0.0|                 4.0|         1.0|        1.0|                   8.0|          0.0|
|               2.0|                 4.0|         0.0|        1.0|                   0.0|          0.0|
|               5.0|                 1.0|         1.0|        1.

In [10]:
indexed_data.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital_status',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_per_week',
 'native_country',
 'label',
 'workclass_indexed',
 'education_indexed',
 'marital_status_indexed',
 'occupation_indexed',
 'relationship_indexed',
 'race_indexed',
 'sex_indexed',
 'native_country_indexed',
 'label_indexed']

In [11]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler(inputCols=["age","fnlwgt","education_num","capital_gain","capital_loss","hours_per_week","education_indexed",
                                "marital_status_indexed","relationship_indexed","race_indexed","sex_indexed","native_country_indexed" , 
                                "occupation_indexed" , "workclass_indexed"] , outputCol = "features")
finalized_data = va.transform(indexed_data)
finalized_data = finalized_data.select(finalized_data.columns[:-3:-1]).withColumnRenamed("label_indexed" , "label")
finalized_data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[39.0,77516.0,13....|  0.0|
|(14,[0,1,2,5,6,12...|  0.0|
|(14,[0,1,2,5,7,8,...|  0.0|
|(14,[0,1,2,5,6,9,...|  0.0|
|[28.0,338409.0,13...|  0.0|
|(14,[0,1,2,5,6,8,...|  0.0|
|[49.0,160187.0,5....|  0.0|
|(14,[0,1,2,5,12,1...|  1.0|
|[31.0,45781.0,14....|  1.0|
|(14,[0,1,2,3,5,6,...|  1.0|
|(14,[0,1,2,5,6,9,...|  1.0|
|(14,[0,1,2,5,6,9,...|  1.0|
|[23.0,122272.0,13...|  0.0|
|[32.0,205019.0,12...|  0.0|
|(14,[0,1,2,5,6,9,...|  0.0|
|(14,[0,1,2,5,7,8,...|  0.0|
|(14,[0,1,2,5,7,8,...|  0.0|
|(14,[0,1,2,5,6,12...|  0.0|
|[43.0,292175.0,14...|  1.0|
|(14,[0,1,2,5,6],[...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [12]:
test_data , train_data = finalized_data.randomSplit([0.3,0.7])

In [13]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
lr = lr.fit(train_data)
predictions = lr.transform(test_data)
predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(14,[0,1,2,3,5],[...|  1.0|[-1.6726582335972...|[0.15807008902088...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[0.01921712948964...|[0.50480413452685...|       0.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-29.976361431495...|[9.58145893851507...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-4.1980016084253...|[0.01480314802436...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-4.7592311648639...|[0.00849933945502...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-30.565764784288...|[5.31443401364856...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-1.0307050056105...|[0.26294744695486...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-2.5963271453761...|[0.06937517341373...|       1.0|
|(14,[0,1,2,3,5,6]...|  1.0|[-1.5803096105181...|[0.17075163809219...|       1.0|
|(14,[0,1,2,3,5,

In [14]:
#claculating the accuarcay accuracy
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

0.8794652290846265