## Pipelines
A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. For Transformer stages, the transform() method is called on the DataFrame. For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.

https://spark.apache.org/docs/3.2.1/ml-pipeline.html


In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cloudanum').getOrCreate()

import libraries

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

Prepare training documents from a list of (id, text, label) tuples.

In [9]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.

In [10]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Fit the pipeline to training documents.

In [11]:
model = pipeline.fit(training)

Prepare test documents, which are unlabeled (id, text) tuples.

In [12]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

Make predictions on test documents and print columns of interest.

In [13]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row  # type: ignore
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.6292098489668488,0.37079015103315116], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566147,0.8658765165743385], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000


# To Do:
Use the following "training data", to first create the pipeline and use that to label the "unlabelled data".

In [None]:
training = spark.createDataFrame([
    (0, "cat dog horse", 1.0),
    (1, "door headphone clock", 0.0),
    (2, "dog duck horse", 1.0),
    (3, "wall blue cell", 0.0)
], ["id", "keywords", "label"])

In [14]:
unlabelled = spark.createDataFrame([
    (0, "cat horse"),
    (1, "door headphone clock"),
    (2, "dog duck horse"),
    (3, "wall  cell")
], ["id", "keywords"])