# Logistic regression using Spark ML

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, NGram, RegexTokenizer, StopWordsRemover
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import functions as F, types as T, Row, SparkSession

Initialise a new `SparkSession`.

In [None]:
spark = SparkSession.builder\
        .appName("Spark ML")\
        .getOrCreate()

Load the [SMS Spam Collection](http://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection).

In [None]:
schema = T.StructType([
    T.StructField("class", T.StringType(), nullable=False),
    T.StructField("text", T.StringType(), nullable=False),
])

spam = spark.read\
       .option("sep", "\t")\
       .csv("datasets/sms-spam.tsv", schema=schema)

In [None]:
spam.count()

In [None]:
spam.show(5)

In [None]:
spam.groupBy(F.col("class")).count().show()

In [None]:
spam.select(F.col("text")).filter(F.col("class") == "spam").take(3)

## Featurisation and modelling

Convert `class` to a binary `label` that can be used for modelling.

In [None]:
spam = spam.withColumn("label", (F.col("class") == "spam").cast(T.IntegerType()))\
           .drop(F.col("class"))

In [None]:
spam.show(5)

### Tokenisation

[Tokenisation](https://en.wikipedia.org/wiki/Lexical_analysis#Tokenization) is the process of breaking text into individual terms (usually words).

In this example we use a simple [`RegexTokenizer`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.RegexTokenizer) that converts the input string to lowercase and extracts words composed of one or more word characters (alphanumeric and underscore) using the [regular expression](https://en.wikipedia.org/wiki/Regular_expression) `\w+` (see also [RegExr](https://regexr.com/)).

In [None]:
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", gaps=False, pattern="\\w+")
spamTransformed = tokenizer.transform(spam)

In [None]:
spamTransformed.take(1)

### $n$-grams

$n$-grams are sequences of $n$ tokens (typically words).

In this example we create 2-grams using the [`NGram`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.NGram) featuriser.

In [None]:
ngram = NGram(inputCol="words", outputCol="ngrams", n=2)
spamTransformed = ngram.transform(spamTransformed)

In [None]:
spamTransformed.take(1)

### tf–idf

[Term frequency – inverse document frequency](https://en.wikipedia.org/wiki/Tf%E2%80%93idf) (tf–idf) is widely used to capture the importance of words and $n$-grams to documents in a corpus.

In this example we build tf–idf importances using:

- [`HashingTF`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.HashingTF), which maps features to indices using the [hashing trick](https://en.wikipedia.org/wiki/Feature_hashing) and computes their frequencies;
- [`IDF`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.IDF) which rescales these frequencies and downweighs features which appear frequently in the corpus.

In [None]:
hashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=2<<8)
spamTransformed = hashingTF.transform(spamTransformed)

In [None]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(spamTransformed)
spamTransformed = idfModel.transform(spamTransformed)

In [None]:
spamTransformed.take(1)

### Modelling

We're finally ready to fit a [`LogisticRegression`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression) model.

In [None]:
lr = LogisticRegression(maxIter=10, family="binomial")
lrModel = lr.fit(spamTransformed)

We can extract a number of classification metrics from the [`BinaryLogisticRegressionTrainingSummary`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary) contained in the `summary` attribute of the fitted model.

In [None]:
lrModel.summary.accuracy

In [None]:
lrModel.summary.areaUnderROC

## Cross-validation

We encapsulate all the transformation and modelling steps into a single `Pipeline` that can be used as the estimator in [`CrossValidator`](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator).

In [None]:
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", gaps=False, pattern="\\w+")
ngram = NGram(inputCol=tokenizer.getOutputCol(), outputCol="ngrams")
hashingTF = HashingTF(inputCol=ngram.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, family="binomial")
pipeline = Pipeline(stages=[tokenizer, ngram, hashingTF, idf, lr])

Next, we define the grid of hyperparameters.

In [None]:
paramGrid = ParamGridBuilder()\
    .addGrid(ngram.n, [1, 2])\
    .addGrid(hashingTF.binary, [False, True])\
    .addGrid(hashingTF.numFeatures, [2<<x for x in range(8, 12)])\
    .addGrid(lr.regParam, [10.**x for x in range(-4, 5)])\
    .addGrid(lr.elasticNetParam, [0.1, 0.5, 0.7, 0.9, 0.95, 0.99, 1.])\
    .build()

In [None]:
len(paramGrid)

Finally, we run a 3-fold cross-validation procedure and score models by the area under the ROC curve.

In [None]:
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                    numFolds=3,
                    seed=42)

In [None]:
cvModel = cv.fit(spam)

We can now retrieve the 'optimal' values for the hyperparameters.

In [None]:
cvModel.bestModel.stages[1].getN()

In [None]:
cvModel.bestModel.stages[2].getBinary()

In [None]:
cvModel.bestModel.stages[2].getNumFeatures()

In [None]:
lrModel = cvModel.bestModel.stages[4]
lrParams = {k.name: v for k, v in lrModel.extractParamMap().items()}

In [None]:
lrParams["regParam"]

In [None]:
lrParams["elasticNetParam"]

### Classification metrics

In [None]:
lrModel.summary.accuracy

In [None]:
lrModel.summary.areaUnderROC

### Confusion matrix

In [None]:
predictions = cvModel.transform(spam)

In [None]:
predictions.groupBy(F.col("label"), F.col("prediction"))\
           .count()\
           .show()

### Prediction

In [None]:
df = spark.createDataFrame([
    Row(text="Hello! I was just texting to see if you'd decided to do anything tomorrow."),
    Row(text="URGENT! You have won a 1 week FREE membership in our £100,000 Prize Jackpot!"),
])

In [None]:
cvModel.transform(df).toPandas()