In [1]:
pip install ucimlrepo

Collecting ucimlrepo
  Downloading ucimlrepo-0.0.7-py3-none-any.whl.metadata (5.5 kB)
Downloading ucimlrepo-0.0.7-py3-none-any.whl (8.0 kB)
Installing collected packages: ucimlrepo
Successfully installed ucimlrepo-0.0.7


In [2]:
from ucimlrepo import fetch_ucirepo

# fetch dataset
adult = fetch_ucirepo(id=2)

# data (as pandas dataframes)
X = adult.data.features
y = adult.data.targets

# metadata
print(adult.metadata)

# variable information
print(adult.variables)


{'uci_id': 2, 'name': 'Adult', 'repository_url': 'https://archive.ics.uci.edu/dataset/2/adult', 'data_url': 'https://archive.ics.uci.edu/static/public/2/data.csv', 'abstract': 'Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset. ', 'area': 'Social Science', 'tasks': ['Classification'], 'characteristics': ['Multivariate'], 'num_instances': 48842, 'num_features': 14, 'feature_types': ['Categorical', 'Integer'], 'demographics': ['Age', 'Income', 'Education Level', 'Other', 'Race', 'Sex'], 'target_col': ['income'], 'index_col': None, 'has_missing_values': 'yes', 'missing_values_symbol': 'NaN', 'year_of_dataset_creation': 1996, 'last_updated': 'Mon Aug 07 2023', 'dataset_doi': '10.24432/C5XW20', 'creators': ['Barry Becker', 'Ronny Kohavi'], 'intro_paper': None, 'additional_info': {'summary': 'Extraction was done by Barry Becker from the 1994 Census database.  A set of reasonably clean records was extracted using the following conditions: ((AAG

In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=771ccfdadf3b58bb3ea5139d813256bbee36fdbc63c0dfdf7743e863e6bec196
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [4]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('adult').getOrCreate()

In [6]:
data = spark.read.csv('/content/adult.data', header=True, inferSchema=True)

In [7]:
data.show()

+---+-----------------+--------+-------------+----+--------------------+------------------+--------------+-------------------+-------+-------+---+----+--------------+------+
| 39|        State-gov|   77516|    Bachelors|  13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|   2174|  0|  40| United-States| <=50K|
+---+-----------------+--------+-------------+----+--------------------+------------------+--------------+-------------------+-------+-------+---+----+--------------+------+
| 50| Self-emp-not-inc| 83311.0|    Bachelors|13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0.0|0.0|13.0| United-States| <=50K|
| 38|          Private|215646.0|      HS-grad| 9.0|            Divorced| Handlers-cleaners| Not-in-family|              White|   Male|    0.0|0.0|40.0| United-States| <=50K|
| 53|          Private|234721.0|         11th| 7.0|  Married-civ-spouse| Handlers-cleaners|       Husband|              Black|   M

In [8]:
labels = ['age','workclass','fnlwgt','education','numbers','marital','occupation','relation','race','gender','gain','loss','hourlypay','country','income']

In [9]:
df = data.toDF(*labels)
df.show()

+---+-----------------+--------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-------+----+---------+--------------+------+
|age|        workclass|  fnlwgt|    education|numbers|             marital|        occupation|      relation|               race| gender|   gain|loss|hourlypay|       country|income|
+---+-----------------+--------+-------------+-------+--------------------+------------------+--------------+-------------------+-------+-------+----+---------+--------------+------+
| 50| Self-emp-not-inc| 83311.0|    Bachelors|   13.0|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|    0.0| 0.0|     13.0| United-States| <=50K|
| 38|          Private|215646.0|      HS-grad|    9.0|            Divorced| Handlers-cleaners| Not-in-family|              White|   Male|    0.0| 0.0|     40.0| United-States| <=50K|
| 53|          Private|234721.0|         11th|    7.0|  Married-civ-spouse| Handlers-

In [10]:
# Checking for the corret datatype for each column
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: double (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: double (nullable = true)
 |-- loss: double (nullable = true)
 |-- hourlypay: double (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [11]:
# Convert the some given feature in correct datatype
from pyspark.sql.functions import col
for i in ['age','fnlwgt','numbers','gain','loss','hourlypay']:
  new_df = df.withColumn(i, col(i).cast('integer'))

In [12]:
new_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- numbers: double (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relation: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gain: double (nullable = true)
 |-- loss: double (nullable = true)
 |-- hourlypay: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- income: string (nullable = true)



In [13]:
# Now Check for the NULL Values
from pyspark.sql.functions import *

new_df.select([count(when(col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [14]:
# Check for different values of columns
df.select('workclass').distinct().show()

+-----------------+
|        workclass|
+-----------------+
|        State-gov|
|      Federal-gov|
| Self-emp-not-inc|
|        Local-gov|
|          Private|
|                ?|
|     Self-emp-inc|
|      Without-pay|
|     Never-worked|
+-----------------+



In [17]:
# replace the unwanted value  with None
df =new_df.replace("?", None)

In [18]:
# Now Check for the NULL Values
from pyspark.sql.functions import *

df.select([count(when(col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [19]:
df.groupby("country").count().show()

+-------------------+-----+
|            country|count|
+-------------------+-----+
| Dominican-Republic|   70|
|            Ireland|   24|
|               Cuba|   95|
|          Guatemala|   64|
|               Iran|   43|
|             Taiwan|   51|
|        El-Salvador|  106|
|      United-States|29169|
|              South|   80|
|              Japan|   62|
|          Nicaragua|   34|
|             Canada|  121|
|           Cambodia|   19|
|               Laos|   18|
|            Germany|  137|
|    Trinadad&Tobago|   19|
|               Peru|   31|
|            Ecuador|   28|
|         Yugoslavia|   16|
|            Vietnam|   67|
+-------------------+-----+
only showing top 20 rows



In [22]:
df = df.fillna("United-States", subset = ['country'])


In [23]:
df = df.fillna("Private", subset = ['workclass'])

In [24]:
df = df.fillna("Prof-speciality", subset = ['occupation'])

In [25]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in new_df.columns]).show()

+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|age|workclass|fnlwgt|education|numbers|marital|occupation|relation|race|gender|gain|loss|hourlypay|country|income|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+
|  0|        0|     0|        0|      0|      0|         0|       0|   0|     0|   0|   0|        0|      0|     0|
+---+---------+------+---------+-------+-------+----------+--------+----+------+----+----+---------+-------+------+



In [44]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler


In [27]:
df.columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'numbers',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'gain',
 'loss',
 'hourlypay',
 'country',
 'income']

In [29]:
categorical_cols = [
 'workclass',
 'education',
 'marital',
 'occupation',
 'relation',
 'race',
 'gender',
 'country',
]

numerical_cols = ['age','fnlwgt','numbers','gain','loss','hourlypay']
label = 'income'

In [33]:
indexer = [StringIndexer(inputCol = c, outputCol = f"{c}_index", handleInvalid = "keep") for c in categorical_cols]


In [34]:
label_indexer = StringIndexer(inputCol = "income", outputCol = "label", handleInvalid = "keep")



In [35]:
assembler = VectorAssembler(inputCols = [f"{c}_index" for c in categorical_cols] + numerical_cols, outputCol = "features")

In [38]:
lr = LogisticRegression(featuresCol = "features", labelCol = "label")

In [43]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = indexer + [label_indexer, assembler, lr])

In [45]:
train_data, test_data = df.randomSplit([0.8, 0.2])

In [46]:
model = pipeline.fit(train_data)

In [47]:
predictions = model.transform(test_data)

In [48]:
predictions.show()

+---+----------+--------+---------+-------+--------------+------------------+----------+------+-------+----+----+---------+----------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+-----+--------------------+--------------------+--------------------+----------+
|age| workclass|  fnlwgt|education|numbers|       marital|        occupation|  relation|  race| gender|gain|loss|hourlypay|         country|income|workclass_index|education_index|marital_index|occupation_index|relation_index|race_index|gender_index|country_index|label|            features|       rawPrediction|         probability|prediction|
+---+----------+--------+---------+-------+--------------+------------------+----------+------+-------+----+----+---------+----------------+------+---------------+---------------+-------------+----------------+--------------+----------+------------+-------------+-----+--------------------+--------------------+-

In [49]:
predictions.select("prediction", "label", "features").show()

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,10.0,1.0,7.0...|
|       0.0|  0.0|[3.0,0.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,7.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,7.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,7.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,7.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,7.0,1.0,7.0,...|
|       0.0|  0.0|[3.0,5.0,1.0,7.0,...|
|       0.0|  0.0|[2.0,5.0,1.0,4.0,...|
|       0.0|  0.0|[2.0,10.0,1.0,5.0...|
|       0.0|  0.0|(14,[1,2,3,4,8,9,...|
|       0.0|  0.0|(14,[1,2,3,4,8,9,...|
|       0.0|  0.0|(14,[1,2,3,4,8,9,...|
|       0.0|  0.0|(14,[1,2,3,4,8,9,...|
|       0.0|  0.0|(14,[1,2,4,6,8,9,...|
+----------+-----+--------------------+
only showing top 20 rows



In [50]:
predictions.select("prediction", "label").distinct().show()

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  0.0|
|       0.0|  0.0|
+----------+-----+



In [52]:
predictions.groupBy("prediction", "label").count().show()

+----------+-----+-----+
|prediction|label|count|
+----------+-----+-----+
|       1.0|  1.0|  827|
|       0.0|  1.0|  732|
|       1.0|  0.0|  302|
|       0.0|  0.0| 4777|
+----------+-----+-----+



In [55]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label", metricName = "accuracy")


In [58]:
accuracy = evaluator.evaluate(predictions)
accuracy

0.8442301898162097