In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark import SparkFiles
#url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
sc =SparkContext()
#sc.addFile(url)
sqlContext = SQLContext(sc)
df = sqlContext.read.csv("./adult_data.csv", header=True, inferSchema= True)

### Understand Data

In [6]:
df.show(5)
df.printSchema()

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  x|age|workclass|fnlwgt|   education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|  1| 25|  Private|226802|        11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|
|  2| 38|  Private| 89814|     HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Male|           0|           0|            50| United-States| <=50K|
|  3| 28|Local-gov|336951|  Assoc-acdm|             12|Married-civ-spouse|  Protective-ser

In [17]:
# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [18]:
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.select('age','fnlwgt').show(5)
df.groupBy("education").count().sort("count",ascending=True).show()	

+----+--------+
| age|  fnlwgt|
+----+--------+
|25.0|226802.0|
|38.0| 89814.0|
|28.0|336951.0|
|44.0|160323.0|
|18.0|103497.0|
+----+--------+
only showing top 5 rows

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [19]:
df.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [23]:
#In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. 
#For instance, you can count the number of people with income below or above 50k by education level. 
#This operation is called a crosstab.
df.crosstab('age', 'income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|      17.0|  595|   0|
|      18.0|  862|   0|
|      19.0| 1050|   3|
|      20.0| 1112|   1|
|      21.0| 1090|   6|
|      22.0| 1161|  17|
|      23.0| 1307|  22|
|      24.0| 1162|  44|
|      25.0| 1119|  76|
|      26.0| 1068|  85|
|      27.0| 1117| 115|
|      28.0| 1101| 179|
|      29.0| 1025| 198|
|      30.0| 1031| 247|
|      31.0| 1050| 275|
|      32.0|  957| 296|
|      33.0| 1045| 290|
|      34.0|  949| 354|
|      35.0|  997| 340|
|      36.0|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [25]:
'''
There are two intuitive API to drop columns:

drop(): Drop a column
dropna(): Drop NA's
Below you drop the column education_num

'''
df.drop('education_num').columns
df.filter(df.age > 40).count()
df.groupby(['marital-status']).agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



### Data Preprocessing

Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights. For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature

Add age square

To add a new feature, you need to:

Select the column
Apply the transformation and add it to the DataFrame

In [26]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



In [29]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'educational-num', 'marital-status',
           'occupation', 'relationship', 'race', 'gender', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native-country', 'income']
df = df.select(COLUMNS)
df.first()

Row(age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7.0, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native-country='United-States', income='<=50K')

Exclude Holand-Netherlands

When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.

Let's check the origin of the household

In [36]:
df.filter(df['native-country'] == 'Holand-Netherlands').count()
df.groupby('native-country').agg({'native-country': 'count'}).sort(asc("count(native-country)")).show()

+--------------------+---------------------+
|      native-country|count(native-country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|Outlying-US(Guam-...|                   23|
|          Yugoslavia|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|              Greece|                   49|
|           Nicaragua|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [38]:
df_remove = df.filter(df['native-country'] != 'Holand-Netherlands')

### Building a data preprocessing Pipeline

Similar to scikit-learn, Pyspark has a pipeline API. A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

Index the string to numeric

Create the one hot encoder

Transform the data

Two APIs do the job: StringIndexer, OneHotEncoder

First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.

In [44]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
#First of all, you select the string column to index. The inputCol is the name of the column in the dataset. 
#outputCol is the new name given to the transformed column.
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
#Fit the data and transform it
model = stringIndexer.fit(df)
indexed = model.transform(df)
#Create the news columns based on the group. For instance, if there are 10 groups in the feature, 
#the new matrix will have 10 columns, one for each group.
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(10)

+----+----------+----------------+--------+------------+---------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
| age|age_square|       workclass|  fnlwgt|   education|educational-num|    marital-status|       occupation| relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|workclass_encoded|workclass_vec|
+----+----------+----------------+--------+------------+---------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|25.0|     625.0|         Private|226802.0|        11th|            7.0|     Never-married|Machine-op-inspct|    Own-child|Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|              0.0|(9,[0],[1.0])|
|38.0|    1444.0|         Private| 89814.0|     HS-grad|            9.0|

### Build the pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

Encode the categorical data
Index the label feature
Add continuous variable
Assemble the steps.
Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

In [66]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#This step is exaclty the same as the above example, except that you loop over all the categorical features.
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(dropLast=False, inputCol= str(stringIndexer.getOutputCol()),
                                     outputCol= str(categoricalCol + "classVec"))
    stages += [stringIndexer, encoder]

# Convert label into label indices using the StringIndexer
income_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [income_stringIdx]
#The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns.
#The code below popluate the list with encoded categorical features and the continuous features.
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
#Finally, you pass all the steps in the VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [71]:
stages

[StringIndexer_4d2fa2592e1fc0948b18,
 OneHotEncoder_431d8bf57def74b4815f,
 StringIndexer_4164b79116559c5ba62b,
 OneHotEncoder_418e9c323747940ae1a6,
 StringIndexer_48ecb3d82fce4b3b8b05,
 OneHotEncoder_4a3abf4aea05a2e31cc6,
 StringIndexer_445aa3a92794c9b22b7a,
 OneHotEncoder_40cb8018cf1ecc237c9f,
 StringIndexer_4032922e0b459e691e56,
 OneHotEncoder_49bea3884f914ac44d71,
 StringIndexer_439aa32ee3d862348e1b,
 OneHotEncoder_4d6f8978799bbb44fc72,
 StringIndexer_4faf812980c7d31a830c,
 OneHotEncoder_4d39a45f71db3d501d7a,
 StringIndexer_490194f68c0a9851c1a3,
 OneHotEncoder_48ff84a8d360530ca9de,
 StringIndexer_403ea03214d5176d73c7,
 VectorAssembler_45e3918b342d34cb6816]

In [72]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)
model.take(1)

[Row(age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7.0, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native-country='United-States', income='<=50K', workclassIndex=0.0, workclassclassVec=SparseVector(9, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(16, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(7, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(15, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(6, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(5, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(2, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(41, {0: 1.0}), newincome=0.0, features=SparseVector(107, {0: 1.0, 14: 1.0, 26: 1.0, 38: 1.0, 49: 1.0, 54: 1.0, 58: 1.0, 60: 1.0, 101: 25.0, 102: 2

In [73]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))
df_train = sqlContext.createDataFrame(input_data, ["income", "features"])
df_train.show()

+------+--------------------+
|income|            features|
+------+--------------------+
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[0.0,0.0,1.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,1.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,1.0,...|
|   1.0|[0.0,1.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,1.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[0.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
|   1.0|[1.0,0.0,0.0,0.0,...|
+------+--------------------+
only showing top 20 rows



In [76]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)
train_data.groupby('income').agg({'income': 'count'}).show()
test_data.groupby('income').agg({'income': 'count'}).show()
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="income",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|        29675|
|   1.0|         9300|
+------+-------------+

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7479|
|   1.0|         2387|
+------+-------------+

Coefficients: [-0.09011608130644566,-0.1384380559306834,-0.03896303543531769,-0.1824457748448405,-0.11893792188013645,0.18847080499878685,0.1839760119625868,-0.24212542028408682,-0.2943767644727913,-0.19865062889469637,-0.07602389181616244,0.21440405239433685,0.3825828010868464,-0.01510053172331298,-0.29743026128624805,-0.022118183258364482,-0.31327611253729226,-0.3943831805361721,0.5329648681065523,-0.3451003029762354,-0.2100999415985072,0.5880134963170344,-0.3270540179267765,-0.37208279515977744,-0.32857180162206834,0.32803921613790427,-0.35597749175340104,-0.20951536020969735,-0.21172515919880228,-0.14186610091919277,-0.13546494888365676,0.12502584341953393,0.2038127961969423,-0.07783296791130631,0.30619834712

In [78]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)
predictions.printSchema()
selected = predictions.select("income", "prediction", "probability")
selected.show(50)
cm = predictions.select("income", "prediction")
cm.groupby('income').agg({'income': 'count'}).show()
cm.groupby('prediction').agg({'prediction': 'count'}).show()
cm.filter(cm.income == cm.prediction).count() / cm.count()	

root
 |-- income: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)

+------+----------+--------------------+
|income|prediction|         probability|
+------+----------+--------------------+
|   0.0|       0.0|[0.92734536207514...|
|   0.0|       0.0|[0.93874864367220...|
|   0.0|       0.0|[0.93510980388595...|
|   0.0|       0.0|[0.91743304613848...|
|   0.0|       0.0|[0.61977331542055...|
|   0.0|       0.0|[0.64604312338942...|
|   0.0|       0.0|[0.65246551845643...|
|   0.0|       0.0|[0.72446047629837...|
|   0.0|       0.0|[0.91469778328490...|
|   0.0|       0.0|[0.84471647810027...|
|   0.0|       0.0|[0.83272583367082...|
|   0.0|       0.0|[0.83454419741497...|
|   0.0|       0.0|[0.52944496204343...|
|   0.0|       0.0|[0.63524895812302...|
|   0.0|       0.0|[0.63030940685612...|
|   0.0|       0.0|[0.56876018042796...|
|   0.0

0.82090006081492

In [80]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income", "prediction")
    acc = cm.filter(cm.income == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)
#Model accuracy: 82.376%

Model accuracy: 82.090%


In [None]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(cm))
print(evaluator.getMetricName())
print(evaluator.evaluate(predictions))

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)
accuracy_m(model = cvModel)