# pyspark.mllib

pyspark.mllib is a ML package that works on RDDs.


## Basic ML procedure using pyspark.mllib

```python
# Here data is an RDD object whose rows are LabeledPoint objects.
# We consider a binary classification problem.

# Split data
data_train, data_test = data.randomSplit([0.8,0.2])

# Train a model
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
model = LogisticRegressionWithLBFGS.train(data_train, iterations=10)

# Prediction
preds = model.predict(data_test.map(lambda row: row.features))
results = data_test.map(lambda row: row.label).zip(preds).map(lambda row: (row[0], row[1]*1.0))

# Evaluation
from pyspark.mllib.evaluation import BinaryClassificationMetrics 
scores = BinaryClassificationMetrics(results)
print(scores.areaUnderPR)     # precision-recall curve
print(scores.areaUnderROC)
```

## pyspark.mllib.stat.Statistics

```python
from pyspark.mllib.stat import Statistics
```

### colStats()

colStats(rdd) computes column-wise summary statistics for the input RDD.

```python
from pyspark.mllib.linalg import Vectors

rdd = sc.parallelize([Vectors.dense([2, 0, 1, -2]),
                      Vectors.dense([4, 5, 0,  3]),
                      Vectors.dense([6, 7, -3,  8])])
cStats = Statistics.colStats(rdd)

# The following are all numpy vectors of length 4.
cStats.mean()
cStats.variance()
cStats.count()
cStats.numNonzeros()
cStats.max()
cStats.min()
cStats.normL1()
cStats.normL2()
```

If the input is a DataFrame:

```python
df.show()
+---+----+----+
| id|   x|   y|
+---+----+----+
|  1|4.22|5.08|
|  2| 5.0|0.58|
| ...         |
+---+----+----+

rdd = df.select('x','y').rdd.map(lambda row: [x for x in row])
cStats = Statistics.colStats(rdd)
```

### corr()

```python
# Here rdd is the one used in colStats.
corrs = Statistics.corr(rdd)        # 4-by-4 np array
```

### chiSqTest()

chiSqTest(observed, expected=None)

`observed` cannot contain negative values.

If `observed` is a vector containing the observed categorical counts/relative frequencies, conduct Pearson's chi-squared goodness of fit test of the observed data against the expected distribution, or againt the uniform distribution (by default), with each category having an expected frequency of `1 / len(observed)`.

If `observed` is matrix, conduct Pearson's independence test on the input contingency matrix, which cannot contain negative entries or columns or rows that sum up to 0.

`expected` is a vector containing the expected categorical counts/relative frequencies. `expected` is rescaled if the expected  sum differs from the `observed` sum.

If `observed` is an RDD of LabeledPoint, conduct Pearson's independence test for every feature against the label across the input RDD. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the chi-squared statistic is computed. All label and feature values must be categorical.
    
```python
from pyspark.mllib.linalg import Vectors, Matrices

observed = Vectors.dense([4, 6, 5])
chi = Statistics.chiSqTest(observed)
# Try: chi.statistic, chi.pValue, chi.degreesOfFreedom, chi.method, chiu.nullHypothesis

observed = Vectors.dense([21, 38, 43, 80])
expected = Vectors.dense([3, 5, 7, 20])
chi = Statistics.chiSqTest(observed, expected)
    
data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
        LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
        LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
        LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
        LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
        LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
rdd = sc.parallelize(data, 4)
chi = Statistics.chiSqTest(rdd)   # a list of length 2, since there are two labels.
[chi[i].pValue for i in range(len(chi))]
[0.6872892787909721, 0.6822703303362126]
```


##  pyspark.mllib.linalg

### Vectors, Matrices

```python
v = Vectors.dense([1,2,3])
v.norm(2)
v.dot(Vectors.dense([3,0,-2]))

M = Matrices.dense(2, 3, range(6))
M.toArray()
array([[0., 2., 4.],
       [1., 3., 5.]])
```

## pyspark.mllib.feature

### HashingTF()

HashingTF(numFeatures=1048576) maps a sequence of terms to their term frequencies using the hashing trick.

```python
htf = HashingTF()
doc = ['a',]*100+['b',]*10+['c',]
htf.transform(doc)
SparseVector(1048576, {238153: 100.0, 469732: 10.0, 702740: 1.0})
```

### ChiSqSelector()

We can select the most predictable features.

```python
# rdd_train and rdd_test are RDD objects consisting of LabeledPoint objects.
model = ChiSqSelector(5).fit(rdd_train)            # top five features
labels = rdd_train.map(lambda row: row.label)
features_selected = model.transform(rdd_train).map(lambda row: row.features)
rdd_train_selected = labels.zip(features_selected).map(lambda row: LabelPoint(row[0], row[1]))

labels = rdd_test.map(lambda row: row.label)
features_selected = model.transform(rdd_test).map(lambda row: row.features)
rdd_test_selected = labels.zip(features_selected).map(lambda row: LabelPoint(row[0], row[1]))
```

## pyspark.mllib.regression

### LabeledPoint()

```python
from pyspark.mllib.regression import LabeledPoint

lp = LabeledPoint(0.0, Vectors.dense([1.5, 20.0]))
lp.label     # 0.0
lp.features  # DenseVector([1.5, 20.0])
```

## pyspark.mllib.tree

### RandomForest

```python
from pyspark.mllib.tree import RandomForest

model = RandomForest.trainClassifier(data=rdd_train, numClasses=2, categoricalFeatureInfo={}, numTrees=10, featureSubsetStrategy='auto', impurity='entropy', maxDepth=4, maxBins=50, seed=4042)
```

# spark.ml

spark.ml is a ML package that works on DataFrames.


## Basic ML procedure using spark.ml


```python
from pyspark.sql.types import IntegerType  
import pyspark.ml.feature as ft
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Assume data is a DataFrame with columns label, gp, weight, and height,
# where gp consists of '1', '2', and '3' and class_label consists of 0 or 1.
# If the column label is not 'label', then we need to specify labelCol='label_name' in some functions used below.

data = data.withColumn('gp', df['gp'].cast(IntegerType()))     
# The column input to OneHotEncoder should be numeric.

make_ohe = ft.OneHotEncoder(inputCol='gp', outputCol='gp_ohe')  
# It will create a column named gp_ohe during the pipeline

make_features = ft.VectorAssembler(inputCols=['weight','height', make_ohe.getOutputCol()], outputCol='features')
# change each row consisting of w, h, and c to the sparse vector [w,h,c]
# Note make_ohe.getOutputCol() is 'gp_ohe'.
# It will create a column named features during the pipeline

select_features = ft.ChiSqSelector(
    numTopFeatures=2, 
    featuresCol=make_features.getOutputCol(),
    outputCol='selected_features')

lr = LogisticRegression(labelCol=select_features.getOutputCol())

pipeline = Pipeline(stages=[make_ohe, make_features, select_features, lr])

paramGrid = ParamGridBuilder() \
     .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
     .addGrid(lr.maxIter, [5, 10, 20]) \
     .build()

evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability') 
# labelCol is 'label' by default.
# rawPredictionCol is either 'rawPrediction' and 'probability'.

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)
# or use
# tvs = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator)

data_train, data_test = data.randomSplit([0.75, 0.25], seed=4042)

model = cv.fit(data_train)
model_test = model.transform(data_test)           
# model_test is a DataFrame whose columns are the columns of df 
# plus ['features', 'rawPrediction', 'probability', 'prediction']

evaluator.evaluate(model_test, {evaluator.metricName: ['areaUnderROC']})
evaluator.evaluate(model_test, {evaluator.metricName: ['areaUnderPR']})
# We will get the same result when evaluator is defiend with rawPredictionCol='rawPrediction'.

# save the pipeline
pipeline.write().overwrite().save('path/to/file')
pl = Pipeline.load('path/fo/file')
model_test = pl.fit(data_train).transform(data_test)

# save the fitted pipeline
from pyspark.ml import PipelineModel
model.write().overwrite().save('path/to/file')
pl_fitted = PipelineModel.load('path/to/file')
model_test = pl_fitted.transform(data_test)
```

## spark.ml.feature

```python
import pyspark.ml.feature as ft
```

### VectorAssembler()

 A feature transformer that merges multiple columns into a vector column.
 
```python
df = spark.createDataFrame([(1, 0, 3)], ["a", "b", "c"])
vecAssembler = ft.VectorAssembler(inputCols=df.columns, outputCol="features")
vecAssembler.transform(df).show()     # no fit() function
+---+---+---+-------------+
|  a|  b|  c|     features|
+---+---+---+-------------+
|  1|  0|  3|[1.0,0.0,3.0]|
+---+---+---+-------------+
```

### OneHotEncoder()

* The last category is not included by default (configurable via dropLast=False),

* Column input to OneHotEncoder() must be of type numeric.

* When encoding multi-column by using `inputCols` and `outputCols` params, input/output cols come in pairs, specified by the order in the arrays, and each pair is treated independently.

```python
df = spark.createDataFrame([(1,), (5,), (0,), (1,), (3,),], ["input"])  
# df.input is a numeric column having three categories.

ohe = ft.OneHotEncoder()
ohe.setInputCols(["input"])
ohe.setOutputCols(["output"])       # or, ohe = ft.OneHoTEncoder(inputCol="input", outputCol="output")

ohe.getInputCols()   # ['input']
ohe.getOutputCols()  # ['output']

df2 = ohe.fit(df).transform(df)
df2.show()                 
+-----+-------------+
|input|       output|
+-----+-------------+
|    1|(5,[1],[1.0])|    # SparseVector corresponding to (0,1,0,0)
|    5|    (5,[],[])|    # SparseVector corresponding to (0,0,0,0)
|    0|(5,[0],[1.0])|    # SparseVector corresponding to (1,0,0,0)
|    1|(5,[1],[1.0])|
|    3|(5,[3],[1.0])|    # SparseVector corresponding to (0,0,0,1)
+-----+-------------+
```

### QuantileDiscretizer()

```python
df = spark.createDataFrame([(float(x),) for x in np.random.rand(100).round(2)], ['x'])
discretizer = ft.QuantileDiscretizer(numBuckets=5, inputCol='x', outputCol='x_disc')
discretizer.fit(df).transform(df).show(n=10)
+----+------+
|   x|x_disc|
+----+------+
|0.54|   2.0|
|0.48|   2.0|
|0.46|   2.0|
|0.02|   0.0|
|0.42|   2.0|
|0.15|   0.0|
|0.35|   1.0|
|0.49|   2.0|
|0.21|   1.0|
|0.76|   4.0|
+----+------+

# x_disc consists of 0.0, 1.0, 2.0, 3.0, and 4.0.
```

### StandardScaler()

```python
df.show()
+----+
|   x|
+----+
|4.59|
|9.09|
|5.97|
|2.99|
+----+

# Use Vectors
df = ft.VectorAssembler(inputCols=['x'], outputCol='x_vec').transform(df)
df.show()
+----+------+
|   x| x_vec|
+----+------+
|4.59|[4.59]|
|9.09|[9.09]|
|5.97|[5.97]|
|2.99|[2.99]|
+----+------+

scaler = ft.StandardScaler(inputCol='x_vec', outputCol='normalized', withMean=True, withStd=True)
scaler.fit(df).transform(df).show()
+----+------+--------------------+
|   x| x_vec|          normalized|
+----+------+--------------------+
|4.59|[4.59]|[-0.413019679166438]|
|9.09|[9.09]|[1.3239789715335344]|
|5.97|[5.97]|[0.11965990704822...|
|2.99|[2.99]|[-1.0306191994153...|
+----+------+--------------------+
```


### NLP features

* RegexTokenizer()
* StopWordsRemover()
* NGram()
* CountVectorizer()

```python
df = spark.createDataFrame([ ['''Hello, guys! Look at the trees.''',], ['''He said, "I'm pretty Good".'''] ], ["text"])

reTokenizer = ft.RegexTokenizer(inputCol=df.columns[0], outputCol='tokens', pattern='\s+|[,.!?\"]')
df1 = reTokenizer.transform(df)
df1.show(truncate=False)
+-------------------------------+-----------------------------------+
|text                           |tokens                             |
+-------------------------------+-----------------------------------+
|Hello, guys! Look at the trees.|[hello, guys, look, at, the, trees]|
|He said, "I'm pretty Good".    |[he, said, i'm, pretty, good]      |
+-------------------------------+-----------------------------------+
                            
remover = ft.StopWordsRemover(inputCol=reTokenizer.getOutputCol(), outputCol='stop_removed')
df2 = remover.transform(df1)
df2.show(truncate=False)
+-------------------------------+-----------------------------------+--------------------------+
|text                           |tokens                             |stop_removed              |
+-------------------------------+-----------------------------------+--------------------------+
|Hello, guys! Look at the trees.|[hello, guys, look, at, the, trees]|[hello, guys, look, trees]|
|He said, "I'm pretty Good".    |[he, said, i'm, pretty, good]      |[said, pretty, good]      |
+-------------------------------+-----------------------------------+--------------------------+
                            
nGram = ft.NGram(n=2, inputCol=remover.getOutputCol(), outputCol="nGrams")
df3 = nGram.transform(df2)
df3.select('nGrams').show(truncate=False)
+-----------------------------------+
|nGrams                             |
+-----------------------------------+
|[hello guys, guys look, look trees]|
|[said pretty, pretty good]         |
+-----------------------------------+
                                  
# Using a pipeline:
from pyspark.ml import Pipeline
                                  
pipeline = Pipeline(stages=[reTokenizer, remover, nGram])
pipeline.fit(df).transform(df).select('nGrams').show(truncate=False)
                                  
                                  
indexer = ft.CountVectorizer(inputCol=remover.getOutputCol(), outputCol="text_indexed")
indexer.fit(df2).transform(df2).select('text_indexed').show(truncate=False)
+-------------------------------+
|text_indexed                   |
+-------------------------------+   # There are 7 distinct words.
|(7,[1,3,4,5],[1.0,1.0,1.0,1.0])|   # sparse vector corresponding to (0,1,0,1,1,1,0)
|(7,[0,2,6],[1.0,1.0,1.0])      |   # sparse vector corresponding to (1,0,1,0,0,0,1)
+-------------------------------+
```

## pyspark.ml.tuning

### ParamGridBuilder(), CrossValidator(), TrainValidationSplit()

```python
lr = LogisticRegression()

grid = ParamGridBuilder() \
     .baseOn({lr.labelCol: 'l'}) \
     .baseOn([lr.predictionCol, 'p']) \
     .addGrid(lr.regParam, [1.0, 2.0]) \
     .addGrid(lr.maxIter, [1, 5]) \
     .build()

evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

```

TrainValidationSplit() evaluates each combination of parameters once, as opposed to $k$ times in the case of CrossValidator.

## Models

### pyspark.ml.classification

* LogisticRegression

* DecisionTreeClassifier, RandomForestClassifier

* GBTClassifier

* NaiveBayes

* MultilayerPerceptronClassifier

* OneVsRest


### pyspark.ml.regression

* AFTSurvivalRegression

* DecisionTreeRegressor

* GBTRegressor

* GeneralizedLinearRegression

* IsotonicRegression

* LinearRegression

* RandomForestRegressor


### pyspark.ml.clustering

* KMeans, BisectingKMeans

* GaussianMixture

* LDA

    Latent Dirichlet Allocation (LDA) is a topic model designed for text documents.

```python
# Assume data is a DataFrame consisting of strings (documents) and data.columns is ['documents'].

pipeline = Pipeline(stages=[
    ft.RegexTokenizer(inputCol='documents', outputCol='tokens', pattern='\s+|[,.?!\"]'),
    ft.StopWordsRemover(inputCol='tokens', outputCol='tokens_stopRemoved'),
    ft.CountVectorizer(inputCol='tokens_stopRemoved', outputCol='tokens_indexed'),
    LDA(k=5, optimizer='online', featuresCol='tokens_indexed')])

results = pipeline.fit(data).transform(data)
results.columns
['documents', 'tokens', 'tokens_stopRemoved', 'tokens_indexed', 'topicDistribution']

# The rows in column 'topicDistribution' shows the probabilities of clusters. 
# To see the cluster numbers:
results.rdd.map(lambda row: row[0].argmax()).collect()
```