In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=37e004a86c790f0c94e95b876bb29fa2fd156444bf32370696ac53f9323ae8f8
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [5]:
import pyspark
from pyspark.sql import SparkSession

In [6]:
# Create a Spark session
spark = SparkSession.builder.appName("income_dataset").getOrCreate()

In [7]:
data=spark.read.csv('/content/adult.data')
data.show()

+---+-----------------+-------+-------------+---+--------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+------+
|_c0|              _c1|    _c2|          _c3|_c4|                 _c5|               _c6|           _c7|                _c8|    _c9|  _c10|_c11|_c12|          _c13|  _c14|
+---+-----------------+-------+-------------+---+--------------------+------------------+--------------+-------------------+-------+------+----+----+--------------+------+
| 39|        State-gov|  77516|    Bachelors| 13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|  2174|   0|  40| United-States| <=50K|
| 50| Self-emp-not-inc|  83311|    Bachelors| 13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|     0|   0|  13| United-States| <=50K|
| 38|          Private| 215646|      HS-grad|  9|            Divorced| Handlers-cleaners| Not-in-family|              White|   Male|     0| 

In [8]:
#adding label name to the columns
labels=['age','workclass','fnlwgt','education','numbers','marital','occupation','relation','race','gender','gain','loss','hourlypay','country','income']

In [9]:
df=data.toDF(*labels)
df.show(5)

+---+-----------------+-------+----------+-------+-------------------+------------------+--------------+------+-------+-----+----+---------+--------------+------+
|age|        workclass| fnlwgt| education|numbers|            marital|        occupation|      relation|  race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+-------+----------+-------+-------------------+------------------+--------------+------+-------+-----+----+---------+--------------+------+
| 39|        State-gov|  77516| Bachelors|     13|      Never-married|      Adm-clerical| Not-in-family| White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc|  83311| Bachelors|     13| Married-civ-spouse|   Exec-managerial|       Husband| White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private| 215646|   HS-grad|      9|           Divorced| Handlers-cleaners| Not-in-family| White|   Male|    0|   0|       40| United-States| <=50K|
| 53|          Private

In [10]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: string (nullable = true)
 |-- loss: string (nullable = true)
 |-- hourlypay: string (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [11]:
from pyspark.sql.functions import col
new_df=df.withColumn('age',col('age').cast('integer'))

In [12]:
new_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: string (nullable = true)
 |-- loss: string (nullable = true)
 |-- hourlypay: string (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [13]:
for i in ['fnlwgt','numbers','gain','loss','hourlypay']:
  new_df=new_df.withColumn(i,col(i).cast('integer'))

In [14]:
from pyspark.sql.functions import *
new_df.select([count(when(col(c).isNull(),c )).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [15]:
df.select('workclass').distinct().show()  #null values are present

+-----------------+
|        workclass|
+-----------------+
|        State-gov|
|      Federal-gov|
| Self-emp-not-inc|
|        Local-gov|
|          Private|
|                ?|
|     Self-emp-inc|
|      Without-pay|
|     Never-worked|
+-----------------+



In [16]:
df.groupby('workclass').count().show()

+-----------------+-----+
|        workclass|count|
+-----------------+-----+
|        State-gov| 1298|
|      Federal-gov|  960|
| Self-emp-not-inc| 2541|
|        Local-gov| 2093|
|          Private|22696|
|                ?| 1836|
|     Self-emp-inc| 1116|
|      Without-pay|   14|
|     Never-worked|    7|
+-----------------+-----+



In [17]:
#replacing the null values
df=new_df.replace(' ?',None)

In [18]:
from pyspark.sql.functions import *
df.select([count(when(col(c).isNull(),c )).alias(c) for c in df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|     1836|     0|        0|      0|      0|      1843|       0|   0|     0|   0|   0|        0|    583|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [19]:
df.groupby('country').count().show()

+-------------------+-----+
|            country|count|
+-------------------+-----+
| Dominican-Republic|   70|
|            Ireland|   24|
|               Cuba|   95|
|          Guatemala|   64|
|               Iran|   43|
|             Taiwan|   51|
|        El-Salvador|  106|
|      United-States|29170|
|              South|   80|
|              Japan|   62|
|          Nicaragua|   34|
|               NULL|  583|
|             Canada|  121|
|           Cambodia|   19|
|               Laos|   18|
|            Germany|  137|
|    Trinadad&Tobago|   19|
|               Peru|   31|
|            Ecuador|   28|
|         Yugoslavia|   16|
+-------------------+-----+
only showing top 20 rows



In [20]:
#using the mode method and filling the null values
df=df.fillna('United-States',subset=['country'])

In [21]:
df.groupby('workclass').count().show()

+-----------------+-----+
|        workclass|count|
+-----------------+-----+
|        State-gov| 1298|
|      Federal-gov|  960|
|             NULL| 1836|
| Self-emp-not-inc| 2541|
|        Local-gov| 2093|
|          Private|22696|
|     Self-emp-inc| 1116|
|      Without-pay|   14|
|     Never-worked|    7|
+-----------------+-----+



In [22]:
#using the mode method and filling the null values
df=df.fillna('Private',subset=['workclass'])

In [23]:
df.groupby('occupation').count().show()

+------------------+-----+
|        occupation|count|
+------------------+-----+
|   Farming-fishing|  994|
|              NULL| 1843|
| Handlers-cleaners| 1370|
|    Prof-specialty| 4140|
|      Adm-clerical| 3770|
|   Exec-managerial| 4066|
|      Craft-repair| 4099|
|             Sales| 3650|
|      Tech-support|  928|
|  Transport-moving| 1597|
|   Protective-serv|  649|
|      Armed-Forces|    9|
| Machine-op-inspct| 2002|
|     Other-service| 3295|
|   Priv-house-serv|  149|
+------------------+-----+



In [24]:
df=df.fillna('Prof-specialty',subset=['occupation'])

In [25]:
new_df.select([count(when(col(c).isNull(),c )).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



#Applying Naive bayes

In [43]:
categorical_cols=['age',
 'education',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'country','income']

numerical_cols=['age','fnlwgt','numbers','gain','loss','hourlypay']
label='income'

In [44]:
def indexer(df,col):
  indexer=StringIndexer(inputCol=col,outputCol=f'{col}_index',handleInvalid='keep')
  indexed=indexer.fit(df).transform(df)
  return indexed

In [45]:
indexer(df,'workclass').show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|workclass_index|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|            4.0|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|            1.0|
| 38|          Private|21

In [46]:
df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male| 2174|   0|       40| United-States| <=50K|
| 50| Self-emp-not-inc| 83311|    Bachelors|     13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0|   0|       13| United-States| <=50K|
| 38|          Private|215646|      HS-grad|      9|            Divorced| Handlers-cleaners| Not-in-famil

In [47]:
#Transformation of the data
for col in categorical_cols:
  index_df=indexer(new_df,col)
  new_df=index_df

In [48]:
new_df.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|age_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|income_index|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+
| 39|        State-gov| 77516|    Bachelors|     13|       Never-married|      Adm-clerical| N

In [49]:
assembler=VectorAssembler(inputCols=[f'{c}_index' for c in categorical_cols]+numerical_cols,outputCol='features')

In [50]:
vector=assembler.transform(new_df)

In [51]:
vector.show()

+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
|age|        workclass|fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender| gain|loss|hourlypay|       country|income|age_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|income_index|            features|
+---+-----------------+------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-----+----+---------+--------------+------+---------+---------------+-------------+----------------+--------------+----------+------------+-------------+------------+--------------------+
| 39|        State-gov| 77516| 

In [52]:
#Ussing correlation metrics
from pyspark.ml.stat import Correlation
Correlation.corr(vector,'features').show()

+--------------------+
|   pearson(features)|
+--------------------+
|1.0              ...|
+--------------------+



In [54]:
col=[f'{c}_index' for c in categorical_cols]+ numerical_cols

In [56]:
r1=Correlation.corr(vector,'features').head()

In [57]:
print('Pearson correalation martrix:\n'+str(r1[0]))

Pearson correalation martrix:
DenseMatrix([[ 1.00000000e+00,  1.05730625e-01,  7.26971202e-02,
               3.71980873e-02, -7.63761398e-02, -4.87566926e-02,
              -1.95395149e-03, -1.48055269e-02,  6.57025772e-02,
               6.84357474e-01, -6.18365044e-02, -1.04325558e-01,
               4.52066638e-02,  2.01655344e-02, -1.74679748e-01],
             [ 1.05730625e-01,  1.00000000e+00, -5.43462520e-03,
              -1.64882302e-02, -1.40352218e-02,  3.20378119e-02,
              -4.02158184e-02,  8.29998874e-02,  4.24489521e-02,
               7.61125290e-02,  3.44838605e-02, -1.70081765e-01,
               5.92847625e-02,  2.19380216e-02, -7.92952053e-03],
             [ 7.26971202e-02, -5.43462520e-03,  1.00000000e+00,
               4.46472295e-02,  4.11140727e-01,  6.89439876e-02,
               4.08301403e-01,  2.15264796e-02, -3.11287762e-01,
               3.05672475e-02,  5.44716467e-03, -1.04875923e-01,
              -5.77145430e-02, -5.33565354e-02, -1.4671090

In [58]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [76]:
#Splitting the data
splits=data.randomSplit([0.6,0.4], 1234)
train=splits[0]
test=splits[1]



In [75]:
data=data.select('features','label')

In [70]:
label=StringIndexer(inputCol='income',outputCol='label',handleInvalid='keep')

In [73]:
data=label.fit(data).transform(data)

In [74]:
data.show()

+--------------------+------+-----+
|            features|income|label|
+--------------------+------+-----+
|[13.0,2.0,1.0,3.0...| <=50K|  0.0|
|(15,[0,1,3,9,10,1...| <=50K|  0.0|
|(15,[0,2,3,4,9,10...| <=50K|  0.0|
|(15,[0,1,3,5,9,10...| <=50K|  0.0|
|[6.0,2.0,0.0,0.0,...| <=50K|  0.0|
|[8.0,3.0,0.0,2.0,...| <=50K|  0.0|
|[31.0,10.0,5.0,5....| <=50K|  0.0|
|(15,[0,3,8,9,10,1...|  >50K|  1.0|
|[1.0,3.0,1.0,0.0,...|  >50K|  1.0|
|[19.0,2.0,0.0,2.0...|  >50K|  1.0|
|[8.0,1.0,0.0,2.0,...|  >50K|  1.0|
|[7.0,2.0,0.0,0.0,...|  >50K|  1.0|
|[3.0,2.0,1.0,3.0,...| <=50K|  0.0|
|[11.0,6.0,1.0,4.0...| <=50K|  0.0|
|[17.0,4.0,0.0,1.0...|  >50K|  1.0|
|[2.0,8.0,0.0,8.0,...| <=50K|  0.0|
|(15,[0,2,3,4,9,10...| <=50K|  0.0|
|(15,[0,2,3,4,9,10...| <=50K|  0.0|
|(15,[0,1,3,9,10,1...| <=50K|  0.0|
|[20.0,3.0,2.0,2.0...|  >50K|  1.0|
+--------------------+------+-----+
only showing top 20 rows



In [77]:
#using the naivebayes model
nb=NaiveBayes(featuresCol='features',labelCol='label',smoothing=1.0,modelType='multinomial')

In [78]:
model=nb.fit(train)

In [79]:
# select rows to display.
predictions = model.transform(test)
predictions.show()

+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|(15,[0,1,2,3,9,10...|  0.0|[-1170.6332690178...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1178.7524895374...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1116.6188965249...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1056.9244173824...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1182.2351619669...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-793.99771379730...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1151.1536382195...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-785.84945549171...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1146.4373386532...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-1107.7771847274...|  [1.0,0.0]|       0.0|
|(15,[0,1,2,4,9,10...|  0.0|[-619.11108211966...|  

In [81]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7859105480079377
