# ML package of PySpark
* This is based on dataframes
* MlLib will be obsolete soon


In [2]:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName("Favio").getOrCreate()
sc = spark.sparkContext

## Predict chances of infant survival with ML

### Load the data

First, we load the data.

In [3]:
from pyspark.sql.types import *

labels = [
    ('INFANT_ALIVE_AT_REPORT', IntegerType()),
    ('BIRTH_PLACE', StringType()),
    ('MOTHER_AGE_YEARS', IntegerType()),
    ('FATHER_COMBINED_AGE', IntegerType()),
    ('CIG_BEFORE', IntegerType()),
    ('CIG_1_TRI', IntegerType()),
    ('CIG_2_TRI', IntegerType()),
    ('CIG_3_TRI', IntegerType()),
    ('MOTHER_HEIGHT_IN', IntegerType()),
    ('MOTHER_PRE_WEIGHT', IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', IntegerType()),
    ('MOTHER_WEIGHT_GAIN', IntegerType()),
    ('DIABETES_PRE', IntegerType()),
    ('DIABETES_GEST', IntegerType()),
    ('HYP_TENS_PRE', IntegerType()),
    ('HYP_TENS_GEST', IntegerType()),
    ('PREV_BIRTH_PRETERM', IntegerType())
]

schema = StructType([
    StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv('births_transformed.csv', 
                        header=True, 
                        schema=schema)

In [6]:
births.select('BIRTH_PLACE').show()

+-----------+
|BIRTH_PLACE|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 20 rows



### Create transformers

In [7]:
from pyspark.ml.feature import *

births = births \
    .withColumn(       'BIRTH_PLACE_INT', 
                births['BIRTH_PLACE'] \
                    .cast(IntegerType()))

In [8]:
births.select('BIRTH_PLACE_INT').show()

+---------------+
|BIRTH_PLACE_INT|
+---------------+
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
|              1|
+---------------+
only showing top 20 rows



Having done this, we can now create our first `Transformer`.

In [9]:
encoder = OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

Let's now create a single column with all the features collated together. 

In [10]:
featuresCreator = VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + \
    [encoder.getOutputCol()], 
    outputCol='features'
)

### Create an estimator

In this example we will (once again) us the Logistic Regression model.

In [11]:
from pyspark.ml.classification import LogisticRegression

Once loaded, let's create the model.

In [12]:
logistic = LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

### Create a pipeline

All that is left now is to creat a `Pipeline` and fit the model. First, let's load the `Pipeline` from the package.

In [13]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator, 
        logistic
    ])

### Fit the model

Conventiently, `DataFrame` API has the `.randomSplit(...)` method.

In [14]:
births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

Now run our `pipeline` and estimate our model.

In [15]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

Here's what the `test_model` looks like.

In [16]:
test_model.toPandas().head()

Unnamed: 0,INFANT_ALIVE_AT_REPORT,BIRTH_PLACE,MOTHER_AGE_YEARS,FATHER_COMBINED_AGE,CIG_BEFORE,CIG_1_TRI,CIG_2_TRI,CIG_3_TRI,MOTHER_HEIGHT_IN,MOTHER_PRE_WEIGHT,...,DIABETES_GEST,HYP_TENS_PRE,HYP_TENS_GEST,PREV_BIRTH_PRETERM,BIRTH_PLACE_INT,BIRTH_PLACE_VEC,features,rawPrediction,probability,prediction
0,0,1,13,99,0,0,0,0,66,133,...,0,0,0,0,1,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(13.0, 99.0, 0.0, 0.0, 0.0, 0.0, 66.0, 133.0, ...","[1.05451134541, -1.05451134541]","[0.74164025989, 0.25835974011]",0.0
1,0,1,14,99,0,0,0,0,63,93,...,0,0,0,0,1,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(14.0, 99.0, 0.0, 0.0, 0.0, 0.0, 63.0, 93.0, 1...","[0.923317808757, -0.923317808757]","[0.715717649505, 0.284282350495]",0.0
2,0,1,14,99,0,0,0,0,63,106,...,0,0,0,0,1,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(14.0, 99.0, 0.0, 0.0, 0.0, 0.0, 63.0, 106.0, ...","[0.715577750716, -0.715577750716]","[0.671632465328, 0.328367534672]",0.0
3,0,1,14,99,0,0,0,0,63,127,...,0,0,0,0,1,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(14.0, 99.0, 0.0, 0.0, 0.0, 0.0, 63.0, 127.0, ...","[1.08290936252, -1.08290936252]","[0.747044158244, 0.252955841756]",0.0
4,0,1,14,99,0,0,0,0,67,280,...,0,0,1,0,1,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(14.0, 99.0, 0.0, 0.0, 0.0, 0.0, 67.0, 280.0, ...","[0.178973737815, -0.178973737815]","[0.544624382138, 0.455375617862]",0.0


### Model performance

Obviously, we would like to now test how well our model did.

In [17]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.7405439747919526
0.7152348988715325


### Saving the model

PySpark allows you to save the `Pipeline` definition for later use.

In [18]:
pipelinePath = 'infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

So, you can load it up later and use straight away to `.fit(...)` and predict.

In [19]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(births_train)\
    .transform(births_test)\
    .take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0545, -1.0545]), probability=DenseVector([0.7416, 0.2584]), prediction=0.0)]

You can also save the whole model

In [21]:
from pyspark.ml import PipelineModel

modelPath = 'infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)

## Parameter hyper-tuning

### Grid search

Load the `.tuning` part of the package.

In [22]:
from pyspark.ml.tuning import *

Next let's specify our model and the list of parameters we want to loop through.

In [23]:
logistic = LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')

grid = ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

Next, we need some way of comparing the models.

In [24]:
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

Create the logic that will do the validation work for us.

In [25]:
cv = CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

Create a purely transforming `Pipeline`.

In [26]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

Having done this, we are ready to find the optimal combination of parameters for our model.

In [27]:
cvModel = cv.fit(data_transformer.transform(births_train))

The `cvModel` will return the best model estimated. We can now use it to see if it performed better than our previous model.

In [28]:
data_train = data_transformer \
    .transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

0.7404526641072416
0.7157767684747429


What parameters has the best model? The answer is a little bit convoluted but here's how you can extract it.

In [29]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7384017020256495)

### Train-Validation splitting

Use the `ChiSqSelector` to select only top 5 features, thus limiting the complexity of our model.

In [None]:
selector = ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featuresCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)

The `TrainValidationSplit` object gets created in the same fashion as the `CrossValidator` model.

In [None]:
tvs = TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

As before, we fit our data to the model, and calculate the results.

In [None]:
tvsModel = tvs.fit(
    data_transformer \
        .transform(births_train)
)

data_train = data_transformer \
    .transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

## Other features of PySpark ML in action

### Feature extraction

#### NLP related feature extractors

Simple dataset.

In [30]:
text_data = spark.createDataFrame([
    ['''Machine learning can be applied to a wide variety 
        of data types, such as vectors, text, images, and 
        structured data. This API adopts the DataFrame from 
        Spark SQL in order to support a variety of data types.'''],
    ['''DataFrame supports many basic and structured types; 
        see the Spark SQL datatype reference for a list of 
        supported types. In addition to the types listed in 
        the Spark SQL guide, DataFrame can use ML Vector types.'''],
    ['''A DataFrame can be created either implicitly or 
        explicitly from a regular RDD. See the code examples 
        below and the Spark SQL programming guide for examples.'''],
    ['''Columns in a DataFrame are named. The code examples 
        below use names such as "text," "features," and "label."''']
], ['input'])

First, we need to tokenize this text.

In [31]:
tokenizer = RegexTokenizer(
    inputCol='input', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')

The output of the tokenizer looks similar to this.

In [32]:
tok = tokenizer \
    .transform(text_data) \
    .select('input_arr') 

tok.take(1)

[Row(input_arr=['machine', 'learning', 'can', 'be', 'applied', 'to', 'a', 'wide', 'variety', 'of', 'data', 'types', 'such', 'as', 'vectors', 'text', 'images', 'and', 'structured', 'data', 'this', 'api', 'adopts', 'the', 'dataframe', 'from', 'spark', 'sql', 'in', 'order', 'to', 'support', 'a', 'variety', 'of', 'data', 'types'])]

Use the `StopWordsRemover(...)`.

In [33]:
stopwords = StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

The output of the method looks as follows

In [34]:
stopwords.transform(tok).select('input_stop').take(1)

[Row(input_stop=['machine', 'learning', 'applied', 'wide', 'variety', 'data', 'types', 'vectors', 'text', 'images', 'structured', 'data', 'api', 'adopts', 'dataframe', 'spark', 'sql', 'order', 'support', 'variety', 'data', 'types'])]

Build `NGram` model and the `Pipeline`.

In [35]:
ngram = NGram(n=2, 
    inputCol=stopwords.getOutputCol(), 
    outputCol="nGrams")

pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

Now that we have the `pipeline` we follow in the very similar fashion as before.

In [36]:
data_ngram = pipeline \
    .fit(text_data) \
    .transform(text_data)
    
data_ngram.select('nGrams').take(1)

[Row(nGrams=['machine learning', 'learning applied', 'applied wide', 'wide variety', 'variety data', 'data types', 'types vectors', 'vectors text', 'text images', 'images structured', 'structured data', 'data api', 'api adopts', 'adopts dataframe', 'dataframe spark', 'spark sql', 'sql order', 'order support', 'support variety', 'variety data', 'data types'])]

That's it. We got our n-grams and we can then use them in further NLP processing.

#### Discretize continuous variables

It is sometimes useful to *band* the values into discrete buckets.

In [None]:
import numpy as np

x = np.arange(0, 100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234

schema = StructType([
    StructField('continuous_var', 
                    DoubleType(), 
                    False
   )
])

data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)

In [None]:
data.show()

Use the `QuantileDiscretizer` model to split our continuous variable into 5 buckets (see the `numBuckets` parameter).

In [None]:
discretizer = QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var', 
    outputCol='discretized')

Let's see what we got.

In [None]:
data_discretized = discretizer.fit(data).transform(data)

data_discretized \
    .groupby('discretized')\
    .mean('continuous_var')\
    .sort('discretized')\
    .collect()

#### Standardizing continuous variables

Create a vector representation of our continuous variable (as it is only a single float)


In [None]:
vectorizer = VectorAssembler(
    inputCols=['continuous_var'], 
    outputCol= 'continuous_vec')

Build a `normalizer` and a `pipeline`.

In [None]:
normalizer = StandardScaler(
    inputCol=vectorizer.getOutputCol(), 
    outputCol='normalized', 
    withMean=True,
    withStd=True
)

pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)

In [None]:
data_standardized.show()

### Classification

Let's test how well would one tree do, then.

In [37]:
from pyspark.ml.classification import DecisionTreeClassifier

In [38]:
classifier = DecisionTreeClassifier(
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
        encoder,
        featuresCreator, 
        classifier]
)

model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = BinaryClassificationEvaluator(
    labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderPR"}))

0.7581052905334167
0.7790296652256041


### Regression

In this section we will try to predict the `MOTHER_WEIGHT_GAIN`.

In [39]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
            'MOTHER_PRE_WEIGHT','DIABETES_PRE',
            'DIABETES_GEST','HYP_TENS_PRE', 
            'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
            'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', 
            'CIG_3_TRI'
           ]

First, we will collate all the features together and use the `ChiSqSelector` to select only the top 6 most important features.

In [40]:
featuresCreator = VectorAssembler(
    inputCols=[col for col in features[1:]], 
    outputCol='features'
)

selector = ChiSqSelector(
    numTopFeatures=6, 
    outputCol="selectedFeatures", 
    labelCol='MOTHER_WEIGHT_GAIN'
)

In order to predict the weight gain we will use the gradient boosted trees regressor.

In [41]:
from pyspark.ml.regression import GBTRegressor


regressor = GBTRegressor(
    maxIter=15, 
    maxDepth=3,
    labelCol='MOTHER_WEIGHT_GAIN')

Finally, again, we put it all together into a `Pipeline`.

In [42]:
pipeline = Pipeline(stages=[
        featuresCreator, 
        selector,
        regressor])

weightGain = pipeline.fit(births_train)

Having created the `weightGain` model, let's see if it performs well on our testing data.

In [43]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    predictionCol="prediction", 
    labelCol='MOTHER_WEIGHT_GAIN')

print(evaluator.evaluate(
     weightGain.transform(births_test), 
    {evaluator.metricName: 'r2'}))

0.4906905234047193
