In [None]:
!wget https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar -xvzf spark-3.0.0-bin-hadoop2.7.tgz
!pip install findspark

In [None]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 3.0 Setup on Google Colab").getOrCreate()
print(spark.sparkContext.appName)

PySpark 3.0 Setup on Google Colab


In [None]:
# Read The dataset
df = spark.read.csv('/content/census-Worksheet.csv',header=True,inferSchema=True)

In [None]:
#Drop null vals
df  = df.dropna()

In [None]:
df.show()

+---+-----------------+---------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
|age|        workclass|education_level|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+-----------------+---------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+
| 39|        State-gov|      Bachelors|           13|       Never-married|      Adm-clerical| Not-in-family|              White|   Male|        2174|           0|            40| United-States| <=50K|
| 50| Self-emp-not-inc|      Bachelors|           13|  Married-civ-spouse|   Exec-managerial|       Husband|              White|   Male|           0|           0|            13| United-States| <=50K|


In [None]:
# Display Dataset's Schema
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [None]:
df.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|  >50K|         7841|
| <=50K|        24720|
+------+-------------+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [None]:
# Selecting categorical features
categorical_columns = [
 'workclass',
 'education_level',
 'marital-status',
 'occupation',
 'relationship',
 'hours-per-week',
 ]

In [None]:
# The index of string values multiple columns
indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categorical_columns]
income_indexer = StringIndexer(inputCol="income", outputCol="income_indexed")

In [None]:
# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers]
income_encoder = OneHotEncoder(dropLast=False,inputCol="income_indexed", outputCol="income_indexed_encoded")

In [None]:
# Vectorizing encoded values
categorical_encoded = [encoder.getOutputCol() for encoder in encoders]
numerical_columns = ['age', 'education-num', 'capital-gain', 'capital-loss']
inputcols = categorical_encoded + numerical_columns
assembler = VectorAssembler(inputCols=inputcols, outputCol="features")
income_encoded = [income_encoder.getOutputCol()]
out_assembler = VectorAssembler(inputCols=income_encoded, outputCol="target")

In [None]:
pipeline = Pipeline(stages=indexers + [income_indexer] + encoders +[income_encoder] + [assembler])
model = pipeline.fit(df)
# Transform data
transformed = model.transform(df)
transformed.show()

+---+-----------------+---------------+-------------+--------------------+------------------+--------------+-------------------+-------+------------+------------+--------------+--------------+------+-----------------+-----------------------+----------------------+------------------+--------------------+----------------------+--------------+-------------------------+-------------------------------+------------------------------+--------------------------+----------------------------+------------------------------+----------------------+--------------------+
|age|        workclass|education_level|education-num|      marital-status|        occupation|  relationship|               race|    sex|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_indexed|education_level_indexed|marital-status_indexed|occupation_indexed|relationship_indexed|hours-per-week_indexed|income_indexed|workclass_indexed_encoded|education_level_indexed_encoded|marital-status_indexed_encoded|occupa

In [None]:
final_data = transformed.select('features', 'income_indexed')

In [None]:
final_data.show()

+--------------------+--------------+
|            features|income_indexed|
+--------------------+--------------+
|(151,[4,11,26,35,...|           0.0|
|(151,[1,11,25,34,...|           0.0|
|(151,[0,9,27,41,4...|           0.0|
|(151,[0,14,25,41,...|           0.0|
|(151,[0,11,25,32,...|           0.0|
|(151,[0,12,25,34,...|           0.0|
|(151,[0,19,30,37,...|           0.0|
|(151,[1,9,25,34,4...|           1.0|
|(151,[0,12,26,32,...|           1.0|
|(151,[0,11,25,34,...|           1.0|
|(151,[0,10,25,34,...|           1.0|
|(151,[4,11,25,32,...|           1.0|
|(151,[0,11,26,35,...|           0.0|
|(151,[0,15,26,36,...|           0.0|
|(151,[0,13,25,33,...|           1.0|
|(151,[0,17,25,40,...|           0.0|
|(151,[1,9,26,42,4...|           0.0|
|(151,[0,9,26,38,5...|           0.0|
|(151,[0,14,25,36,...|           0.0|
|(151,[1,12,27,34,...|           1.0|
+--------------------+--------------+
only showing top 20 rows



In [None]:
train_data, test_data = final_data.randomSplit([0.75,0.25])

In [None]:
# Let's count how many people with income below/above 50k in training data
train_data.groupby('income_indexed').agg({'income_indexed': 'count'}).show()

+--------------+---------------------+
|income_indexed|count(income_indexed)|
+--------------+---------------------+
|           0.0|                18508|
|           1.0|                 5933|
+--------------+---------------------+



In [None]:
# Let's count how many people with income below/above 50k in training data
test_data.groupby('income_indexed').agg({'income_indexed': 'count'}).show()

+--------------+---------------------+
|income_indexed|count(income_indexed)|
+--------------+---------------------+
|           0.0|                 6212|
|           1.0|                 1908|
+--------------+---------------------+



In [None]:
#We need to look at the accuracy metric to see how well the model performs.
def accuracy_model(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income_indexed", "prediction")
    acc = cm.filter(cm.income_indexed == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features",labelCol="income_indexed")
# Fit the data to the model
lrModel = lr.fit(train_data)

In [None]:
# Make predictions on test data using the transform() method.
lr_predictions = lrModel.transform(test_data)

In [None]:
lr_predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- income_indexed: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
selected = lr_predictions.select("income_indexed", "prediction")
selected.show(20)

+--------------+----------+
|income_indexed|prediction|
+--------------+----------+
|           1.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           0.0|       1.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           0.0|       1.0|
|           1.0|       0.0|
|           1.0|       1.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
+--------------+----------+
only showing top 20 rows



In [None]:
accuracy_model(model = lrModel)

Model accuracy: 85.320%


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'income_indexed', maxDepth = 5)
# Fit the data to the model
dtModel = dt.fit(train_data)

In [None]:
# Make predictions on test data using the transform() method.
dt_predictions = dtModel.transform(test_data)

In [None]:
dt_predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- income_indexed: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
selected = dt_predictions.select("income_indexed", "prediction")
selected.show(20)

+--------------+----------+
|income_indexed|prediction|
+--------------+----------+
|           1.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           1.0|       0.0|
|           1.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
|           0.0|       0.0|
+--------------+----------+
only showing top 20 rows



In [None]:
accuracy_model(model = dtModel)

Model accuracy: 84.064%
