# Logistic Regression Classification

In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

Utility function to create the appropriate data frame for classification algorithms in MLlib

In [2]:
def mapLibSVM(row): 
    return (row[5],Vectors.dense(row[:3]))

create the dataframe from a csv

In [3]:
df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("datasets/iris.data")

In [4]:
df.show()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|      label|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|
|         4.8|        3.0|         1.4| 

Classification algorithms requires numeric values for labels

In [5]:
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
indexer = indexer.fit(df).transform(df)
indexer.show()

+------------+-----------+------------+-----------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|      label|labelIndex|
+------------+-----------+------------+-----------+-----------+----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|       0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|       0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|       0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|       0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|       0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|       0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|       0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|       0.0|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|       0.0|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|       0.0|
|         5.4|        3.7

In [6]:
dfLabeled = indexer.rdd.map(mapLibSVM).toDF(["label", "features"])
dfLabeled.show()
train, test = dfLabeled.randomSplit([0.9, 0.1], seed=11)

+-----+-------------+
|label|     features|
+-----+-------------+
|  0.0|[5.1,3.5,1.4]|
|  0.0|[4.9,3.0,1.4]|
|  0.0|[4.7,3.2,1.3]|
|  0.0|[4.6,3.1,1.5]|
|  0.0|[5.0,3.6,1.4]|
|  0.0|[5.4,3.9,1.7]|
|  0.0|[4.6,3.4,1.4]|
|  0.0|[5.0,3.4,1.5]|
|  0.0|[4.4,2.9,1.4]|
|  0.0|[4.9,3.1,1.5]|
|  0.0|[5.4,3.7,1.5]|
|  0.0|[4.8,3.4,1.6]|
|  0.0|[4.8,3.0,1.4]|
|  0.0|[4.3,3.0,1.1]|
|  0.0|[5.8,4.0,1.2]|
|  0.0|[5.7,4.4,1.5]|
|  0.0|[5.4,3.9,1.3]|
|  0.0|[5.1,3.5,1.4]|
|  0.0|[5.7,3.8,1.7]|
|  0.0|[5.1,3.8,1.5]|
+-----+-------------+
only showing top 20 rows



schema verification

In [7]:
train.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



Instantiate the Logistic Regression and the pipeline.

In [8]:
lr = LogisticRegression(labelCol="label", maxIter=10)

We use a *ParamGridBuilder* to construct a grid of parameters to search over.

*TrainValidationSplit* will try all combinations of values and determine best model using the evaluator.

In [9]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.001]) \
    .build()

In this case the estimator is simply the linear regression.

A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.

In [10]:
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.9)

Fit the pipeline to training documents.

In [11]:
model = tvs.fit(train)

Compute the predictions from the model

In [12]:
result = model.transform(test)
predictions = result.select(["prediction", "label"])
predictions.show()

+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
|       2.0|  2.0|
+----------+-----+



In [13]:
# Instantiate metrics object
metrics = MulticlassMetrics(predictions.rdd)

# Overall statistics
print("Summary Stats")
print("Precision = %s" % metrics.precision())
print("Recall = %s" % metrics.recall())
print("F1 Score = %s" % metrics.fMeasure())
print("Accuracy = %s" % metrics.accuracy)

Summary Stats
Precision = 1.0
Recall = 1.0
F1 Score = 1.0
Accuracy = 1.0


In [14]:
# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

Weighted recall = 1.0
Weighted precision = 1.0
Weighted F(1) Score = 1.0
Weighted F(0.5) Score = 1.0
Weighted false positive rate = 0.0
