Skip to content

Commit

Permalink
[FLINK-28224] Add document for algorithms introduced before 2.1 release
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jun 30, 2022
1 parent 91e7464 commit 0df7f62
Show file tree
Hide file tree
Showing 32 changed files with 2,559 additions and 322 deletions.
2 changes: 1 addition & 1 deletion docs/content/docs/operators/classification/_index.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: Classification
bookCollapseSection: true
weight: 2
weight: 1
aliases:
- /operators/feature/
---
Expand Down
173 changes: 102 additions & 71 deletions docs/content/docs/operators/classification/knn.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,33 @@ specific language governing permissions and limitations
under the License.
-->

# KNN
## KNN

K Nearest Neighbor(KNN) is a classification algorithm. The basic assumption of
KNN is that if most of the nearest K neighbors of the provided sample belongs to
the same label, then it is highly probabl that the provided sample also belongs
KNN is that if most of the nearest K neighbors of the provided sample belong to
the same label, then it is highly probable that the provided sample also belongs
to that label.

## Input Columns
### Input Columns

| Param name | Type | Default | Description |
| :---------- | :------ | :----------- | :--------------- |
| featuresCol | Vector | `"features"` | Feature vector |
| labelCol | Integer | `"label"` | Label to predict |

## Output Columns
### Output Columns

| Param name | Type | Default | Description |
| :------------ | :------ | :------------- | :-------------- |
| predictionCol | Integer | `"prediction"` | Predicted label |

## Parameters
### Parameters

Below are parameters required by `KnnModel`.
Below are the parameters required by `KnnModel`.

| Key | Default | Type | Required | Description |
| ------------- | -------------- | ------- | -------- | -------------------------------- |
| K | `5` | Integer | no | The number of nearest neighbors. |
|---------------| -------------- | ------- | -------- | -------------------------------- |
| k | `5` | Integer | no | The number of nearest neighbors. |
| featuresCol | `"features"` | String | no | Features column name. |
| predictionCol | `"prediction"` | String | no | Prediction column name. |

Expand All @@ -59,83 +59,108 @@ Below are parameters required by `KnnModel`.
| -------- | --------- | ------ | -------- | ------------------ |
| labelCol | `"label"` | String | no | Label column name. |

## Examples
### Examples

{{< tabs knn >}}
{{< tabs examples >}}

{{< tab "Java">}}
```java
import org.apache.flink.ml.classification.knn.Knn;
import org.apache.flink.ml.classification.knn.KnnModel;
import org.apache.flink.ml.linalg.DenseVector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

/** Simple program that trains a Knn model and uses it for classification. */
public class KnnExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Generates input training and prediction data.
DataStream<Row> trainStream =
env.fromElements(
Row.of(Vectors.dense(2.0, 3.0), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(200.1, 300.1), 2.0),
Row.of(Vectors.dense(200.2, 300.2), 2.0),
Row.of(Vectors.dense(200.3, 300.3), 2.0),
Row.of(Vectors.dense(200.4, 300.4), 2.0),
Row.of(Vectors.dense(200.4, 300.4), 2.0),
Row.of(Vectors.dense(200.6, 300.6), 2.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.3, 3.2), 1.0),
Row.of(Vectors.dense(2.3, 3.2), 1.0),
Row.of(Vectors.dense(2.8, 3.2), 3.0),
Row.of(Vectors.dense(300., 3.2), 4.0),
Row.of(Vectors.dense(2.2, 3.2), 1.0),
Row.of(Vectors.dense(2.4, 3.2), 5.0),
Row.of(Vectors.dense(2.5, 3.2), 5.0),
Row.of(Vectors.dense(2.5, 3.2), 5.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0));
Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");

DataStream<Row> predictStream =
env.fromElements(
Row.of(Vectors.dense(4.0, 4.1), 5.0), Row.of(Vectors.dense(300, 42), 2.0));
Table predictTable = tEnv.fromDataStream(predictStream).as("features", "label");

// Creates a Knn object and initializes its parameters.
Knn knn = new Knn().setK(4);

// Trains the Knn Model.
KnnModel knnModel = knn.fit(trainTable);

// Uses the Knn Model for predictions.
Table outputTable = knnModel.transform(predictTable)[0];

// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
double expectedResult = (Double) row.getField(knn.getLabelCol());
double predictionResult = (Double) row.getField(knn.getPredictionCol());
System.out.printf(
"Features: %-15s \tExpected Result: %s \tPrediction Result: %s\n",
features, expectedResult, predictionResult);
}
}
}

List<Row> trainRows =
new ArrayList<>(
Arrays.asList(
Row.of(Vectors.dense(2.0, 3.0), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(200.1, 300.1), 2.0),
Row.of(Vectors.dense(200.2, 300.2), 2.0),
Row.of(Vectors.dense(200.3, 300.3), 2.0),
Row.of(Vectors.dense(200.4, 300.4), 2.0),
Row.of(Vectors.dense(200.4, 300.4), 2.0),
Row.of(Vectors.dense(200.6, 300.6), 2.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0),
Row.of(Vectors.dense(2.3, 3.2), 1.0),
Row.of(Vectors.dense(2.3, 3.2), 1.0),
Row.of(Vectors.dense(2.8, 3.2), 3.0),
Row.of(Vectors.dense(300., 3.2), 4.0),
Row.of(Vectors.dense(2.2, 3.2), 1.0),
Row.of(Vectors.dense(2.4, 3.2), 5.0),
Row.of(Vectors.dense(2.5, 3.2), 5.0),
Row.of(Vectors.dense(2.5, 3.2), 5.0),
Row.of(Vectors.dense(2.1, 3.1), 1.0)));
List<Row> predictRows =
new ArrayList<>(
Arrays.asList(
Row.of(Vectors.dense(4.0, 4.1), 5.0),
Row.of(Vectors.dense(300, 42), 2.0)));
Schema schema =
Schema.newBuilder()
.column("f0", DataTypes.of(DenseVector.class))
.column("f1", DataTypes.DOUBLE())
.build();

DataStream<Row> dataStream = env.fromCollection(trainRows);
Table trainData = tEnv.fromDataStream(dataStream, schema).as("features", "label");
DataStream<Row> predDataStream = env.fromCollection(predictRows);
Table predictData = tEnv.fromDataStream(predDataStream, schema).as("features", "label");

Knn knn = new Knn();
KnnModel knnModel = knn.fit(trainData);
Table output = knnModel.transform(predictData)[0];

output.execute().print();
```
{{< /tab>}}

{{< tab "Python">}}
```python
# Simple program that trains a Knn model and uses it for classification.
#
# Before executing this program, please make sure you have followed Flink ML's
# quick start guideline to set up Flink ML and Flink environment. The guideline
# can be found at
#
# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/

from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.classification.knn import KNN
from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# load flink ml jar
env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")

# create a StreamTableEnvironment
t_env = StreamTableEnvironment.create(env)

# generate input training and prediction data
train_data = t_env.from_data_stream(
env.from_collection([
(Vectors.dense([2.0, 3.0]), 1.0),
Expand Down Expand Up @@ -173,19 +198,25 @@ predict_data = t_env.from_data_stream(
['features', 'label'],
[DenseVectorTypeInfo(), Types.DOUBLE()])))

knn = KNN()
# create a knn object and initialize its parameters
knn = KNN().set_k(4)

# train the knn model
model = knn.fit(train_data)

# use the knn model for predictions
output = model.transform(predict_data)[0]
output.execute().print()

# output
# +----+--------------------------------+--------------------------------+--------------------------------+
# | op | features | label | prediction |
# +----+--------------------------------+--------------------------------+--------------------------------+
# | +I | [4.0, 4.1] | 5.0 | 5.0 |
# | +I | [300.0, 42.0] | 2.0 | 2.0 |
# +----+--------------------------------+--------------------------------+--------------------------------+

# extract and display the results
field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
features = result[field_names.index(knn.get_features_col())]
expected_result = result[field_names.index(knn.get_label_col())]
actual_result = result[field_names.index(knn.get_prediction_col())]
print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ ' \tActual Result: ' + str(actual_result))
```
{{< /tab>}}

{{< /tabs>}}

0 comments on commit 0df7f62

Please sign in to comment.