# Classification and regression

This page covers algorithms for Classification and Regression. It also includes sections discussing specific classes of algorithms, such as linear methods, trees, and ensembles.

# Classification

## Logistic regression

Logistic regression is a popular method to predict a binary response. It is a special case of Generalized Linear models that predicts the probability of the outcome. For more background and more details about the implementation, refer to the documentation of the logistic regression in spark.mllib.

The current implementation of logistic regression in spark.ml only supports binary classes. Support for multiclass regression will be added in the future.

When fitting LogisticRegressionModel without intercept on dataset with constant nonzero column, Spark MLlib outputs zero coefficients for constant nonzero columns. This behaviour is the same as R glmnet but different from LIBSVM.

### Example

The following example shows how to train a logistic regression model with elastic net regularization. elasticNetParam corresponds to α and regParam corresponds to λ.

In [None]:
from __future__ import print_function

from pyspark.ml.classification import LogisticRegression
%matplotlib inline

# Load training data
training = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

The spark.ml implementation of logistic regression also supports extracting a summary of the model over the training set. Note that the predictions and metric which are stored as DataFrame in BinaryLogisticRegressionSummary are annotated @transient and hence only available on the driver.

Logistic regression model summary is not yet suppotred in Python. So, we give a Scala example:

LogisticRegressionTrainingSummary provides a summary for a LogisticRegressionModel. Currently, only binary classification is supported and the summary must be explicitly cast to BinaryLogisticRegressionTrainingSummary. This will likely change when multiclass classification is supported.

Continuing the earlier example:


``` Scala
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier example
val trainingSummary = lrModel.summary

// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(loss => println(loss))

// Obtain the metrics useful to judge performance on test data.
// We cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary classification problem.
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = binarySummary.roc
roc.show()
println(binarySummary.areaUnderROC)

// Set the model threshold to maxmize F-Measure
val fMeasure = binarySummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
```

## Decision tree classifier

Decision trees are a popular family of classification and regression methods. More information about the spark.ml implementation can be found further in the section on decision trees.

### Example

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the Decision Tree algorithm can recognize.

More details on parameters can be found in the Python API documentation.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)

## Random forest classifier

Random forests are a popular family of classification and regression methods. More information about the spark.ml implementation can be found further in the section on random forests.

### Example

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the tree-based algorithms can recognize.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel) # summary only

## Gradient-boosted tree classifier

Gradient-boosted trees (GBTs) are a popular classification and regression method using ensembles of decision trees. More information about the spark.ml implementation can be found further in the section on GBTs.

### Example

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. We use two feature transformers to prepare the data; these help index categories for the label and categorical features, adding metadata to the DataFrame which the tree-based algorithms can recognize.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel) # summary only

## Multilayer perceptron classifier

Multilayer perceptron classifier (MLPC) is a classifier based on the feedforward artificial neural network. MLPC consists of multiple layers of nodes. Each layer is fully connected to the next layer in the network. Nodes in the input layer represent the input data. All other nodes map inputs to outputs by a linear combination of the inputs with the node's weights w and bias b and applying an activation function. This can be written in matrix form for MLPC with K + 1 layers as follows:

![](4.attach_files/multilayer_perceptron_formula.png)

Nodes in intermediate layers use sigmoid (logistic) function:

![](4.attach_files/sigmoid_formula.png)

Nodes in the output layer use softmax function:

![](4.attach_files/softmax_formula.png)

The number of nodes N in the output layer corresponds to the number of classes.

MLPC employs backpropagation for learning the model. We use the logistic loss function for optimization and L-BFGS as an optimization routine.

In [None]:
from __future__ import print_function

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# Load training data
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_multiclass_classification_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy: " + str(evaluator.evaluate(predictionAndLabels)))

## One-vs-Rest classifier (a.k.a One-vs-All)

OneVsRest is an example of a machine learning reduction for performing multiclass classification given a base classifier that can perform binary classification efficiently. It is also known as "One-vs-All".

OneVsRest is implemented as an Estimator. For the base classifier it takes instances of Classifier and creates a binary classification problem for each of the k classes. The classifier for class i is trained to predict whether the label is i or not, distinguishing class i from all other classes.

Predictions are done by evaluating each binary classifier and the index of the most confident classifier is output as label.

### Example

The example below demonstrates how to load the Iris dataset, parse it as a DataFrame and perform multiclass classification using OneVsRest. The test error is calculated to measure the algorithm accuracy.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# load data file
inputData = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_multiclass_classification_data.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# Compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error: " + str(1 - accuracy))

## Naive Bayes

Naive Bayes classifiers are a family of simple probabilistic classifiers based on applying Bayes' theorem with strong (naive) independence assumptions between the features. The spark.ml implementation currently supports both multinomial naive Bayes and Bernoulli naive Bayes. More information can be found in the section on Naive Bayes in MLlib.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
%matplotlib inline

# Load training data
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Accuracy: " + str(evaluator.evaluate(predictionAndLabels)))

# Regression

## Linear regression

The interface for working with linear regression models and model summaries is similar to the logistic regression case.

When fitting LinearRegressionModel without intercept on dataset with constant nonzero column by "l-bfgs" solver, Spark MLlib outputs zero coefficients for constant nonzero columns. This behavior is the same as R glmnet but different from LIBSVM.

### Example

The following example demonstrates training on elastic net regularized linear regression model and extracting model summary statistics.

In [None]:
from __future__ import print_function

from pyspark.ml.regression import LinearRegression
%matplotlib inline

# Load training data
training = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regressions
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

## Generalized linear regression

Contrasted with linear regression where the output is assumed to follow a Gaussian distribution, generalized linear models (GLMs) are specifications of linear models where the response variable Yi follows some distribution from the exponential family of distributions. Spark's GeneralizedLinearRegression interface allows for flexible specification of GLMs which can be used for various types of prediction problems including linear regression, Poisson regression, logistic regression, and others. Currently in spark.ml only a subset of the exponential family distributions are supported and they are listed below.

NOTE: Spark currently only supports up to 4096 features through its GeneralizedLinearRegression interface, and will throw an exception if this constraint is exceeded. See the advanced section for more details. Still, for linear and logistic regression, models with an increased number of features can be trained using the LinearRegression and LogisticRegression estimators.

GLMs require exponential family distributions that can be written in their "canonical" or "natural" form, aka natural exponential family distributions. The form of a natural exponential family distribution is given as:

![](4.attach_files/natural_exponential_family_distribution.png)

where θ is the parameter of interest and τ is a dispersion parameter. In a GLM the response Yi, is assumed to be drawn from a natural exponential family distribution:

![](4.attach_files/Yi.png)

where the parameter of interest θi is related to the expected value of the response variable μi by

![](4.attach_files/μi.png)

Here, A'(θi) is defined by the form of the distribution selected. GLMs also allow specification of a link function, which defines the relationship between the expected value of the response variable μi and the the so called linear predictor ηi:

![](4.attach_files/link_function.png)

Often, the link function is chosen such that A'=g^(-1), which yields a simplified relationship between the parameter of interest θ and the linear predictor η. In this case, the link function g(μ) is said to be the "canonical" link function.

![](4.attach_files/θi.png)

A GLM finds the regression coefficients β which maximize the likelihood function.

![](4.attach_files/likelihood_function.png)

where the parameter of interest θi is related to the regression coefficients β by

![](4.attach_files/θi2.png)

Spark's generalized linear regression interface also provides summary statistics for diagnosing the fit of GLM models, including residuals, p-values, deviances, the Akaike information criterion, and others.

### Available families

![](4.attach_files/families.png)

### Example

The following example demonstrates training a GLM with a Gaussian response and identity link function and extracting model summary statistics.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml.regression import GeneralizedLinearRegression
%matplotlib inline

# Load training data
dataset = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_linear_regression_data.txt")

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(dataset)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

## Decision tree regression

Decision trees are a popular family of classification and regression methods. More information about the spark.ml implementation can be found further in the section on decision trees.

### Example

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. We use a feature transformer to index categorical features, adding metadata to the DataFrame which the Decision Tree algorithm can recognize.

More details on parameters can be found in the Python API documentation.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
%matplotlib inline

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline.
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)

## Random forest regression

Random forests are a popular family of classification and regression methods. More information about the spark.ml implementation can be found further in the section on ramdom forests.

### Example

The following examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate on the held-out test set. We use a feature transformer to index categorical features, adding metadata to the DataFrame which the tree-based algorithm can recognize.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
%matplotlib inline

# Load and parse the data file. Converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline.
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel) # summary only

## Gradient-boosted tree regression

Gradient-boosted trees (GBTs) are a popular regression method using ensembles of decision trees. More informations about the spark.ml implementation can be found further in the section on GBTs.

### Example

Note: For this example dataset, GBTRegressor actually only needs 1 iteration, but that will not be true in general.

Refer to the Python API docs for more details.

In [None]:
from __future__ import print_function

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
%matplotlib inline

# Load and parse the data file. Converting it to a DataFrame.
data = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error.
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel) # summary only

## Survival regression

In spark.ml, we implement the Accelerated failure time (AFT) model which is a parametric survival regression model for censored data. It describes a model for the log of survival time, so it's often called a log-linear model for survival analysis. Different from a Proportional hazards model designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.

Given the values of the covariates x', for random lefetime ti of subjects i=1, ..., n, with possible right-censoring, the likelihood function under the AFT model is given as:

![](4.attach_files/AFT_likelihood_function.png)

Where δi is the indicator of the event has occured i.e. uncensored or not. Using 

![](4.attach_files/intermediate_function.png)

the log-likelihood function assumes the form:

![](4.attach_files/AFT_log-likelihood_function.png)

Where S0(ϵi) is the baseline survivor function, and f0(ϵi) is the corresponding density function.

The most commonly used AFT model is based on the Weibull distribution of the survival time. The Weibull distribution for lifetime corresponds to the extreme value distribution for the log of the lifetime, and the S0(ϵ) function is:

![](4.attach_files/S0.png)

the f0(ϵi) function is:

![](4.attach_files/f0.png)

The log-likelihood function for AFT model with a Weibull distribution of lifetime is:

![](4.attach_files/AFT_Weibull.png)

Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, the loss function we use to optimize is

![](4.attach_files/loss_function.png)

The gradient functions for β and logσ respectively are:

![](4.attach_files/gradient_function.png)

The AFT model can be formulated as a convex optimization problem, i.e. the task of finding a minimizer of a convex function

![](4.attach_files/loss_function.png)

that depends on the coefficients vector β and the log of scale parameter logσ. The optimization algorithm underlying the implementation is L-BFGS. The implementation matches the result from R's survival function survreg.

When fitting AFTSurvivalRegressionModel without intercept on dataset with contrast nonzero column, Spark MLlib outputs zero coefficients for constant nonzero columns. The behaviour is different from R survival::survreg.

In [None]:
from __future__ import print_function

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors
%matplotlib inline

training = spark.createDataFrame([
        (1.218, 1.0, Vectors.dense(1.560, -0.605)),
        (2.949, 0.0, Vectors.dense(0.346, 2.158)),
        (3.627, 0.0, Vectors.dense(1.380, 0.231)),
        (0.273, 1.0, Vectors.dense(0.520, 1.151)),
        (4,199, 0.0, Vectors.dense(0.795, -0.226))
    ], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities, quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)

## Isotonic regression

Isotonic regression belongs to the family of regression algorithms. Formally isotonic regression is a problem where given a finite set of real numbers Y=y1,y2,...,yn representing observed responses and X=x1,x2,...,xn the unknown response values to be fitted finding a function that minimises

![](4.attach_files/minimise.png)

with respect to complete order subject to x1≤x2≤...≤xn where wi are positive weights. The resulting function is called isotonic regression and it is unique. It can be viewed as least squares problem under order restriction. Essentially isotonic regression is a monotonic function best fitting the original data points.

We implement a pool adjacent violators algorithm which uses an approach to parallelizing isotonic regression. The training input is a DataFrame which contains three columns label, features and weight. Additionally IsotonicRegression algorithm has one optional parameter called isotonic defaulting to true. This argument specifies if the isotonic regression is isotonic (monotonically increasing) or antitonic (monotonically decreasing).

Training returns an IsotonicRegressionModel that can be used to predict labels for both known and unknown features. The result of isotonic regression is treated as a piecewise linear function. The rules for prediction therefore are:

* If the prediction input exactly matches a training feature then associated prediction is rreturned. In case there are multiple predictions with the same features then one of them is returned. Which one is undefined (same as java.util.Arrays.binarySearch).
* If the prediction input is lower or higher than all training features then prediction with lowest or highest feature is returned respectively. In case there are multiple predictions with the same feature then the lowest or highest is returned respectively.
* If the prediction input falls between two training features then prediction is treated as piecewise linear function and interpolated value is caculated from the predictions of the two closest features. In case there are multiple values with the same feature then the same rules as in previous point are used.

Refer to the IsotonicRegression Python docs for more details on the API.

In [None]:
from __future__ import print_function

from pyspark.ml.regression import IsotonicRegression, IsotonicRegressionModel
%matplotlib inline

# Loads data.
dataset = spark.read.format("libsvm").load("file:///usr/local/hadoop/spark/data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Trains an isotonic regression model.
model = IsotonicRegression().fit(dataset)
print("Boundaries in increasing order: " + str(model.boundaries))
print("Predictions associated with the boundaries: " + str(model.predictions))

# Makes predictions.
model.transform(dataset).show()

# Linear methods

