In [1]:
# running from docker! ## docker run  --rm  -p 8888:8888  -v <YOUR WORKING DIR>:/home/jovyan/work/  jupyter/pyspark-notebook

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import types


In [3]:
sc = SparkContext()

In [4]:
# resilient distributed data
nums = sc.parallelize([1, 2, 3, 4])

In [5]:
nums.take(1)

[1]

In [6]:
squared = nums.map(lambda x: x**2).collect()
for num in squared:
    print(num)

1
4
9
16


In [7]:
### SQLContext

In [8]:
from pyspark.sql import Row, SQLContext

In [9]:
sqlcontext = SQLContext(sc)



In [10]:
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

In [11]:
rdd = sc.parallelize(list_p)

In [12]:
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [13]:
df_ppl = sqlcontext.createDataFrame(ppl)

In [14]:
df_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [15]:
### Machine Learning Example with PySpark

In [16]:
import pandas as pd

In [17]:
url = "https://raw.githubusercontent.com/sadhana1002/PredictingSalaryClass-Classification/master/adult.csv"

In [18]:
df = sqlcontext.createDataFrame(pd.read_csv(url, names=['Age','workclass',
                                                         'fnlwgt','education',
                                                         'education_num',
                                                         'marital',
                                                         'occupation',
                                                         'relationship','race',
                                                         'sex','capital_gain',
                                                         'capital_loss',
                                                         'hours_week',
                                                         'native_country','label']
                                           ))

In [19]:
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [20]:
df.show(5, truncate=False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|workclass        |fnlwgt|education |education_num|marital            |occupation        |relationship  |race  |sex    |capital_gain|capital_loss|hours_week|native_country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40        | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13        | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divorced          | Hand

In [21]:
from pyspark.sql.types import *

In [22]:
def convert_column(df, names, new_type):
    for name in names:
        df = df.withColumn(name, df[name].cast(new_type))
    return df

In [23]:
CONTI_FEATURES = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']

In [24]:
df = convert_column(df, CONTI_FEATURES, FloatType())

In [25]:
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



### Select Columns

In [26]:
df.select('age', 'fnlwgt').show(5)

+----+--------+
| age|  fnlwgt|
+----+--------+
|39.0| 77516.0|
|50.0| 83311.0|
|38.0|215646.0|
|53.0|234721.0|
|28.0|338409.0|
+----+--------+
only showing top 5 rows



In [27]:
df.groupBy('education').count().sort('count', ascending=True).show()

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



### Describe the data

In [28]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|summary|               age|   workclass|            fnlwgt|    education|    education_num|  marital|       occupation|relationship|               race|    sex|      capital_gain|     capital_loss|        hours_week|native_country| label|
+-------+------------------+------------+------------------+-------------+-----------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|    32561|            32561|       32561|              32561|  32561|             32561|            32561|             32561|         32561| 32561|
|   mean| 38.58164675532078|        null

In [29]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840329|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



### Crosstab computation

In [30]:
df.crosstab('age', 'label').sort('age_label').show()

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|     17.0|   395|    0|
|     18.0|   550|    0|
|     19.0|   710|    2|
|     20.0|   753|    0|
|     21.0|   717|    3|
|     22.0|   752|   13|
|     23.0|   865|   12|
|     24.0|   767|   31|
|     25.0|   788|   53|
|     26.0|   722|   63|
|     27.0|   754|   81|
|     28.0|   748|  119|
|     29.0|   679|  134|
|     30.0|   690|  171|
|     31.0|   705|  183|
|     32.0|   639|  189|
|     33.0|   684|  191|
|     34.0|   643|  243|
|     35.0|   659|  217|
|     36.0|   635|  263|
+---------+------+-----+
only showing top 20 rows



### Dropping Columns
* drop()
* dropna()

In [31]:
df.drop('educational_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

### Filtering Data

In [32]:
df.filter(df.age > 40).count()

13443

### Descriptive statistics by group

In [33]:
df.groupBy('marital').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



# Step 2: Data Preprocessing

In [34]:
from pyspark.sql.functions import  *

In [35]:
age_square = df.select(col('age')**2)  # select column

In [36]:
df = df.withColumn('age_square', col('age')**2)  # apply the transformation

In [37]:
df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [38]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']

In [39]:
df = df.select(COLUMNS)

In [40]:
df.first()

Row(age=39.0, age_square=1521.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K')

#### Exclude Holand-Netherlands
When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.

In [None]:
df.filter(df.native_country == 'Holand-Netherlands')
df.groupBy('native_country').agg({'native_country':'count'}).sort(asc('count(native_country)')).show()

### The feature native_country has only one household coming from Netherland. We can exclude it.

In [None]:
df_remove = df.filter(df.native_country != 'Holand-Netherlands')

# Step 3: Building a data processing pipeline

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [None]:
string_indexer = StringIndexer(inputCol='workclass', outputCol='workclass_encoded')

In [None]:
model = string_indexer.fit(df)
indexed = model.transform(df)

In [None]:
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol='workclass_vec').fit(indexed)
encoded = encoder.transform(indexed)

In [None]:
encoded.show(2)

### Building Pipeline
* Encode the categorical data
* Index the label feature
* Add continuous variable
* Assemble the steps.

In [None]:
from pyspark.ml import Pipeline

In [None]:
CAT_FEATS = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']

In [None]:
stages = []  # stages in the pipeline
for cat_col in CAT_FEATS:
    string_indexer = StringIndexer(inputCol=cat_col, outputCol=cat_col+'_index')
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[cat_col+ '_classVec'])
    stages += [string_indexer, encoder]

In [None]:
# indexing the label feature Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages

In [None]:
label_stringIdx = StringIndexer(inputCol='label', outputCol='newlabel')
stages += [label_stringIdx]

* Add continuous variable
The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below populate the list with encoded categorical features and continuous features.

In [None]:
assembler_inputs = [c + '_classVec' for c in CAT_FEATS] + CONTI_FEATURES

### Assembling Steps

In [None]:
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol='features')
stages += [assembler]

In [None]:
# creating pipeline
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df_remove)
model = pipeline_model.transform(df_remove)

If you check the new dataset, you can see that it contains all the features, transformed and not transformed. You are only interested by the newlabel and features. The features includes all the transformed features and the continuous variables.

In [None]:
model.take(1)

# Step 4: Building the classifier logistic

In [None]:
# To make the computation faster, we convert features to DenseVector type.

In [None]:
from pyspark.ml.linalg import DenseVector

In [66]:
input_data = model.rdd.map(lambda x: (x['newlabel'], DenseVector(x['features'])))

In [68]:
df_train = sqlcontext.createDataFrame(input_data, ['label', 'features'])

In [69]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



### create a train/test set

In [70]:
train_data, test_data = df_train.randomSplit([.8, .2], seed=4)

In [71]:
train_data.groupby('label').agg({'label':'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19741|
|  1.0|        6243|
+-----+------------+



In [72]:
test_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4979|
|  1.0|        1598|
+-----+------------+



In [73]:
from pyspark.ml.classification import  LogisticRegression

In [74]:
lr = LogisticRegression(labelCol='label',
                        featuresCol='features',
                        maxIter=10,
                        regParam=0.3)

In [75]:
linear_model = lr.fit(train_data)

In [76]:
print(f'Coefficient: {linear_model.coefficients}')
print(f'Intercept: {linear_model.intercept}')

Coefficient: [-0.018527539032472302,-0.046978784559105015,0.009533073672760647,-0.14904348802451997,-0.02935561592350226,0.27957285225173245,0.22449796200503042,-0.38458378897004614,-0.1416632856030793,-0.039969830466465406,0.2334037563111941,0.38729815058870176,0.025517395135263,-0.21740619350898982,-0.01758856539794037,-0.20892925450343625,-0.2740810875202967,0.5104209617792288,-0.22884193338354822,-0.17469167298278607,0.5501522684917342,-0.19845152589583112,-0.2754873155240856,0.3998030150858301,-0.3079464659396153,-0.16817047247087247,-0.19551220303537245,-0.14328304856192953,-0.14824102371341177,0.23393171273585417,-0.027551978991028757,0.34492719127841526,-0.09838902633282087,0.07498341463182165,-0.2549886703243672,-0.1689028592292304,-0.1496728284404243,-0.08221534921174388,-0.24390756156886564,-0.2908029536171595,0.14332507879413897,0.12151251550918317,-0.2417923128431854,0.3289329589892319,-0.18184885745489876,-0.28043425585154497,-0.21600349170532446,0.45638130373999775,0.084

# Step 5: Train and evaluate the model

In [77]:
predictions = linear_model.transform(test_data)

In [78]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [79]:
selected = predictions.select('label', 'prediction', 'probability')

In [80]:
selected.show()

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.94206706582691...|
|  0.0|       0.0|[0.93927981114910...|
|  0.0|       0.0|[0.56961153725128...|
|  0.0|       0.0|[0.83368415976937...|
|  0.0|       0.0|[0.78676808724668...|
|  0.0|       0.0|[0.58405301588100...|
|  0.0|       0.0|[0.83848913212908...|
|  0.0|       0.0|[0.83887217480987...|
|  0.0|       0.0|[0.67028011028082...|
|  0.0|       0.0|[0.77262513531413...|
|  0.0|       1.0|[0.45457580898441...|
|  0.0|       1.0|[0.40876712325614...|
|  0.0|       0.0|[0.88152150240174...|
|  0.0|       0.0|[0.87355602383092...|
|  0.0|       0.0|[0.85354884543282...|
|  0.0|       0.0|[0.85480041813625...|
|  0.0|       0.0|[0.87773417291387...|
|  0.0|       0.0|[0.66418026046810...|
|  0.0|       0.0|[0.78401635272341...|
|  0.0|       0.0|[0.63485836148747...|
+-----+----------+--------------------+
only showing top 20 rows



## Evaluate the model
currently spark does not have api to compute accuracy

In [81]:
cm = predictions.select('label', 'prediction')

In [82]:
cm.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4979|
|  1.0|        1598|
+-----+------------+



In [83]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5856|
|       1.0|              721|
+----------+-----------------+



In [85]:
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8253002888855101

In [92]:
def accuracy_m(model):
    predictions = model.transform(test_data)
    cm = predictions.select('label', 'prediction')
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print(f'Model accuracy {acc * 100:.3f}')

In [93]:
accuracy_m(linear_model)

Model accuracy 82.530


### ROC metrics
The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 - specificity.

In [94]:
from pyspark.ml.evaluation import  BinaryClassificationEvaluator

In [95]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction')

In [96]:
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8958058765463253
areaUnderROC


# Step 6: Tune the hyperparameter

In [97]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [98]:
param_grid = (ParamGridBuilder().addGrid(lr.regParam,
                                         [0.01, 0.5]).build())

In [99]:
from time import perf_counter

In [100]:
start_time = perf_counter()

In [101]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid,
                    evaluator=evaluator, numFolds=5)

In [102]:
cv_model = cv.fit(train_data)

In [103]:
end_time = perf_counter()

In [104]:
elapsed_time = end_time - start_time

In [105]:
print(f'Time to train model {elapsed_time:.3f} seconds')

Time to train model 73.550 seconds


In [106]:
accuracy_m(model=cv_model)

Model accuracy 85.160


In [107]:
best_model = cv_model.bestModel

In [108]:
best_model.extractParamMap()

{Param(parent='LogisticRegression_024bf3982ba4', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_024bf3982ba4', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_024bf3982ba4', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_024bf3982ba4', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_024bf3982ba4', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_024bf3982ba4', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_024bf3982ba4', name='maxBlockSizeInMB', doc='maximum memory in MB for s

In [None]:
# pd.read_csv(url, names=['Age','workclass',
#                                                          'fnlwgt','education',
#                                                          'education_num',
#                                                          'marital',
#                                                          'occupation',
#                                                          'relationship','race',
#                                                          'sex','capital_gain',
#                                                          'capital_loss',
#                                                          'hours_week',
#                                                          'native_country','label'])