![image.png](attachment:image.png)

![image.png](attachment:image.png)

![image.png](attachment:image.png)


# Apache Spark MLIB


Machine Learning Library (MLlib) Guide

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

    ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
    Featurization: feature extraction, transformation, dimensionality reduction, and selection
    Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
    Persistence: saving and load algorithms, models, and Pipelines
    Utilities: linear algebra, statistics, data handling, etc.


Basic Statistics

    Correlation
    Hypothesis testing
        ChiSquareTest
    Summarizer

Correlation

Calculating the correlation between two series of data is a common operation in Statistics. In spark.ml we provide the flexibility to calculate pairwise correlations among many series. The supported correlation methods are currently Pearson’s and Spearman’s correlation.

In [1]:
from pyspark.sql import SparkSession

# Create or get existing SparkSession
spark = SparkSession.builder \
    .appName("CorrelationExample") \
    .getOrCreate()


In [2]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])
df.head(10)

[Row(features=SparseVector(4, {0: 1.0, 3: -2.0})),
 Row(features=DenseVector([4.0, 5.0, 0.0, 3.0])),
 Row(features=DenseVector([6.0, 7.0, 0.0, 8.0])),
 Row(features=SparseVector(4, {0: 9.0, 3: 1.0}))]

# Dense Vector

Stores all values explicitly, even if many of them are zero.

Backed by a simple array of floats/doubles.
# Sparse Vector

Stores only the non-zero values and their indices, which saves memory when most entries are zero.

Internally represented as (size, indices, values).



In [3]:
r1 = Correlation.corr(df, "features").head()


print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()


print("Spearman correlation matrix:\n" + str(r2[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


# Question:   Explain difference between Pearson and Spearman

**Pearson correlation** measures the linear relationship between two datasets. It assesses how well the observed data fits a linear equation. The correlation coefficient ranges from -1 to +1, where
* +1 indicates a perfect positive linear relationship (as one variable increases, the other increases proportionally).
* -1 indicates a perfect negative linear relationship (as one variable increases, the other decreases proportionally).
* 0 indicates no linear relationship.

**Spearman correlation**, on the other hand, assesses the monotonic relationship between two datasets. It evaluates how well the relationship between two variables can be described using a monotonic function (a function that is either entirely non-increasing or entirely non-decreasing). Instead of using the actual data values, Spearman correlation uses the rank of the data values. This makes it less sensitive to outliers and non-linear relationships compared to Pearson correlation. The Spearman correlation coefficient also ranges from -1 to +1, with similar interpretations regarding the strength and direction of the monotonic relationship.

In summary:

* Pearson measures linear relationships.
* Spearman measures monotonic relationships (based on ranks).

# Hypothesis testing

Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically significant, whether this result occurred by chance or not. spark.ml currently supports Pearson’s Chi-squared ( $\chi^2$) tests for independence.
## ChiSquareTest

ChiSquareTest conducts Pearson’s independence test for every feature against the label. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared statistic is computed. All label and feature values must be categorical.

In [4]:
from pyspark.ml.stat import ChiSquareTest


data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])

r = ChiSquareTest.test(df, "features", "label").head()



print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.6872892787909721,0.6822703303362126]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]


If two categorical variables A and B are independent:

* P(A=i,B=j)=P(A=i) P(B=j)


N = 80
$$
P(\text{Male}) = \tfrac{40}{80} = 0.5, \quad
P(\text{Female}) = \tfrac{40}{80} = 0.5
$$
$$
P(\text{Likes}) = \tfrac{50}{80} = 0.625, \quad
P(\text{Doesn't}) = \tfrac{30}{80} = 0.375
$$



$$
E(\text{Male, Likes})    = 80 \cdot 0.5 \cdot 0.625 = 25
$$
$$
E(\text{Male, Doesn't})  = 80 \cdot 0.5 \cdot 0.375 = 15
$$
$$
E(\text{Female, Likes})  = 80 \cdot 0.5 \cdot 0.625 = 25
$$
$$
E(\text{Female, Doesn't})  = 80 \cdot 0.5 \cdot 0.375 = 25
$$

### Chi-Square Test Calculation

We compare observed $(O)$ and expected $(E)$ frequencies using:

$$
\chi^2 = \sum \frac{(O - E)^2}{E}
$$

We compare observedvalue with Expected value
$$
\begin{align*}
\frac{(30-25)^2}{25} &= 1.0 \\
\frac{(10-15)^2}{15} &= 1.67 \\
\frac{(20-25)^2}{25} &= 1.0 \\
\frac{(20-15)^2}{15} &= 1.67
\end{align*}
$$


$$
\chi^2 = 1.0 + 1.67 + 1.0 + 1.67 \approx 5.33
$$

χ² = 0 → complete independence (perfect match).

χ² small → close to independence (differences are just noise).


### χ² large → strong evidence they are not independent.

In ML feature selection (Spark, scikit-learn, etc.)

You run χ² between each categorical feature and the target (label).

* Features with low p-values are likely informative.

* Features with high p-values look like noise and can be dropped.

# Summarizer

In [5]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                            Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))])

# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count","std")
#Available metrics are:
#the column-wise max, min, mean, sum, variance, std, and number of nonzeros.
# compute statistics for multiple metrics with weight
df.select(summarizer.summary(df.features, df.weight, )).show(truncate=False)
df.select(summarizer.summary(df.features )).show(truncate=False)


+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1, [0.0,0.0,0.0]}  |
+-----------------------------------+

+---------------------------------------------------------------+
|aggregate_metrics(features, 1.0)                               |
+---------------------------------------------------------------+
|{[1.0,1.5,2.0], 2, [0.0,0.7071067811865476,1.4142135623730951]}|
+---------------------------------------------------------------+



![image.png](attachment:image.png)

# PIPELINES
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow.
Mainly inspired in SCIKIT LEARN SO! very similar

## Main classes :
* DataFrames:


* Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame.  E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

* Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

* Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Parameter: All Transformers and Estimators now share a common API for specifying parameters.



# Pipeline components
## Transformers

* A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
* A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.

## Estimators

* Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.
* For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.


# Pipeline
In machine learning, it is common to run a sequence of algorithms to process and learn from data. E.g., a simple text document processing workflow might include several stages:

    Split each document’s text into words.
    Convert each document’s words into a numerical feature vector.
    Learn a prediction model using the feature vectors and labels.

MLlib represents such a workflow as a Pipeline, which consists of a sequence of PipelineStages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.


![image.png](attachment:image.png)

The first two (Tokenizer and HashingTF) are Transformers (blue), and the third (LogisticRegression) is an Estimator (red). The bottom row represents data flowing through the pipeline, where cylinders indicate DataFrames. The Pipeline.fit() method is called on the original DataFrame, which has raw text documents and labels. The Tokenizer.transform() method splits the raw text documents into words, adding a new column with words to the DataFrame. The HashingTF.transform() method converts the words column into feature vectors, adding a new column with those vectors to the DataFrame.

In [6]:
from pyspark.ml.classification import LogisticRegression
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
#print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)



In [None]:
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
#print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
#print(model2.extractParamMap())



Model 1 was fit using parameters: 
Model 2 was fit using parameters: 


In [None]:
# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.05707304993572537,0.9429269500642746], prediction=1.0
features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238521956443227,0.07614780435567725], prediction=0.0
features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972780286187774,0.8902721971381222], prediction=1.0


![image.png](attachment:image.png)

# There are many models available, be sure to choose at least 3 and try to fit them

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.6292098489668484,0.3707901510331516], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566158,0.8658765165743384], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000


## On ROW 4!
Even though "spark" is there, the model also sees "i j k" (never in training), so "spark" alone wasn’t enough to push probability > 0.5.

Details  of tokenizing

In [10]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



# Question: Please explain what a Tokenizer does, and how does it do it, how about the hasher?
## hint hash
h:{all strings}→{integers}

**Tokenizer**

A `Tokenizer` is a feature transformer that breaks down a text document into individual words or "tokens". It does this by splitting the text based on whitespace and sometimes punctuation.

*   **How it works:** The `Tokenizer` takes a column of text (strings) as input and outputs a new column containing arrays of strings, where each string in the array is a word from the original text. For example, the sentence "Hi I heard about Spark" would be tokenized into the array `["Hi", "I", "heard", "about", "Spark"]`.

**HashingTF**

`HashingTF` (Hashing Term Frequency) is another feature transformer that converts a collection of text documents (represented as arrays of tokens) into numerical feature vectors. It uses the hashing trick to map terms (words) to indices in a fixed-size vector.

*   **How it works:** Instead of building a vocabulary of all unique words, which can be memory-intensive for large datasets, `HashingTF` applies a hash function to each word. The output of the hash function is used as an index in a feature vector. The value at that index is incremented each time the word appears in the document. This process creates a vector where each element represents the frequency of words that hash to that particular index.

    The number of features in the output vector is determined by the `numFeatures` parameter. A larger `numFeatures` reduces the likelihood of hash collisions (different words mapping to the same index).

In summary:

1.  **Tokenizer:** Splits text into words.
2.  **HashingTF:** Converts words into numerical feature vectors using a hash function and term frequency.

# Word2Vec

Word2Vec is an Estimator which takes sequences of words representing documents and trains a Word2VecModel. The model maps each word to a unique fixed-size vector. The Word2VecModel transforms each document into a vector using the average of all words in the document; this vector can then be used as features for prediction, document similarity calculations, etc. Please refer to the MLlib user guide on Word2Vec for more details.



In [11]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.021449678391218186,0.016573484241962432,0.030733881890773775]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.05426883537854467,-0.059132851128067286,0.06164071549262319]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.01564687564969063,0.0912977852858603,-0.056992095476016406]



# Model selection (a.k.a. hyperparameter tuning)

An important task in ML is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LogisticRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps.
Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately.

## MLlib supports model selection using tools such as CrossValidator and TrainValidationSplit.
They Require:

*    Estimator: algorithm or Pipeline to tune
*    Set of ParamMaps: parameters to choose from, sometimes called a “parameter grid” to search over
*    Evaluator: metric to measure how well a fitted Model does on held-out test data

## At a high level, these model selection tools work as follows:

*    They split the input data into separate training and test datasets.
*    For each (training, test) pair, they iterate through the set of ParamMaps:
*    For each ParamMap, they fit the Estimator using those parameters, get the fitted Model, and evaluate the Model’s performance using the Evaluator.
*    They select the Model produced by the best-performing set of parameters.


# Cross-Validation

CrossValidator begins by splitting the dataset into a set of folds which are used as separate training and test datasets. E.g., with $k=3$ folds, CrossValidator will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular ParamMap, CrossValidator computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs.

After identifying the best ParamMap, CrossValidator finally re-fits the Estimator using the best ParamMap and the entire dataset.

In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

Row(id=4, text='spark i j k', probability=DenseVector([0.2665, 0.7335]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9204, 0.0796]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.4438, 0.5562]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.8587, 0.1413]), prediction=0.0)


# Deep Learning
Spark is well able to perform deep learning, it has some apllications already as spark local and good supoort for the most commn frameworks.

![image.png](attachment:image.png)

In [13]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(modelType="multinomial", labelCol="label", featuresCol="features")

pipeline_nb = Pipeline(stages=[tokenizer, hashingTF, nb])

paramGrid_nb = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .build()

crossval_nb = CrossValidator(estimator=pipeline_nb,
                             estimatorParamMaps=paramGrid_nb,
                             evaluator=BinaryClassificationEvaluator(),
                             numFolds=2)

cvModel_nb = crossval_nb.fit(training)

prediction_nb = cvModel_nb.transform(test)
print("=== Naive Bayes ===")
prediction_nb.select("id", "text", "probability", "prediction").show(truncate=False)

=== Naive Bayes ===
+---+---------------+----------------------------------------+----------+
|id |text           |probability                             |prediction|
+---+---------------+----------------------------------------+----------+
|4  |spark i j k    |[0.12522710941200788,0.8747728905879921]|1.0       |
|5  |l m n          |[0.7685811612217593,0.23141883877824074]|0.0       |
|6  |mapreduce spark|[0.15136226034308783,0.8486377396569121]|1.0       |
|7  |apache hadoop  |[0.8652459541740107,0.13475404582598927]|0.0       |
+---+---------------+----------------------------------------+----------+



In [14]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

pipeline_rf = Pipeline(stages=[tokenizer, hashingTF, rf])

paramGrid_rf = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

crossval_rf = CrossValidator(estimator=pipeline_rf,
                             estimatorParamMaps=paramGrid_rf,
                             evaluator=BinaryClassificationEvaluator(),
                             numFolds=2)

cvModel_rf = crossval_rf.fit(training)

prediction_rf = cvModel_rf.transform(test)
print("=== Random Forest ===")
prediction_rf.select("id", "text", "probability", "prediction").show(truncate=False)

=== Random Forest ===
+---+---------------+---------------+----------+
|id |text           |probability    |prediction|
+---+---------------+---------------+----------+
|4  |spark i j k    |[0.0125,0.9875]|1.0       |
|5  |l m n          |[1.0,0.0]      |0.0       |
|6  |mapreduce spark|[0.2125,0.7875]|1.0       |
|7  |apache hadoop  |[1.0,0.0]      |0.0       |
+---+---------------+---------------+----------+



In [15]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=20)

pipeline_gbt = Pipeline(stages=[tokenizer, hashingTF, gbt])

paramGrid_gbt = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .build()

crossval_gbt = CrossValidator(estimator=pipeline_gbt,
                              estimatorParamMaps=paramGrid_gbt,
                              evaluator=BinaryClassificationEvaluator(),
                              numFolds=2)

cvModel_gbt = crossval_gbt.fit(training)

prediction_gbt = cvModel_gbt.transform(test)
print("=== Gradient-Boosted Trees ===")
prediction_gbt.select("id", "text", "probability", "prediction").show(truncate=False)

=== Gradient-Boosted Trees ===
+---+---------------+-----------------------------------------+----------+
|id |text           |probability                              |prediction|
+---+---------------+-----------------------------------------+----------+
|4  |spark i j k    |[0.04364652142729318,0.9563534785727068] |1.0       |
|5  |l m n          |[0.9563534785727067,0.043646521427293306]|0.0       |
|6  |mapreduce spark|[0.04364652142729318,0.9563534785727068] |1.0       |
|7  |apache hadoop  |[0.9563534785727067,0.043646521427293306]|0.0       |
+---+---------------+-----------------------------------------+----------+

