Skip to content
Permalink
Browse files
[FLINK-26158] Update java/connected-components example to use playgro…
…und ingress/egress
  • Loading branch information
tillrohrmann committed Feb 16, 2022
1 parent fdb0e78 commit 7f68b7a299f9d68a5966059220ab152e158a069e
Showing 9 changed files with 224 additions and 221 deletions.
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
- Functions service that runs your functions and expose them through an HTTP endpoint.
- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
well as function state storage in a consistent and fault-tolerant manner.
- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
and you can also extend to connect with other systems.

To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
@@ -21,7 +19,6 @@ Changes of the component id of a vertex are being output via an egress.
- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
`src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
$ docker-compose build
```

This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
This pulls all the necessary Docker images (StateFun), and also builds the functions service image. This can
take a few minutes as it also needs to build the function's Java project.

Afterward the build completes, start running all the services:
@@ -51,12 +48,33 @@ $ docker-compose up

## Play around!

You can take a look at what messages are being sent to the Kafka egress:
The connected components applications allows you to do the following actions:

* Add a new vertex to the graph via sending a `Vertex` message to the `vertex` function

In order to send messages to the Stateful Functions application you can run:

```
$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "1", "neighbours": ["2", "3"]}' localhost:8090/connected-components.fns/vertex/1
$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "2", "neighbours": ["1", "4"]}' localhost:8090/connected-components.fns/vertex/2
$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "3", "neighbours": ["1"]}' localhost:8090/connected-components.fns/vertex/3
$ curl -X PUT -H "Content-Type: application/vnd.connected-components.types/vertex" -d '{"vertex_id": "4", "neighbours": ["2"]}' localhost:8090/connected-components.fns/vertex/4
```

You can take a look at what messages are being sent to the Playground egress:

```
$ docker-compose exec kafka rpk topic consume connected-component-changes
$ curl -X GET localhost:8091/connected-component-changes
```

### Messages

All messages are expected to be encoded as JSON:

* `Vertex`: `{"vertex_id": "1", "neighbours": ["2", "3"]}`, `vertex_id` is the id of the `vertex` function

## What's next?

You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
functions. Some ideas you can try out:
- Enable the connected component computation for graphs with undirected edges
@@ -35,51 +35,12 @@ services:
###############################################################

statefun:
image: apache/flink-statefun-playground:3.2.0
image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
- "8090:8090"
- "8091:8091"
depends_on:
- kafka
- connected-components-functions
volumes:
- ./module.yaml:/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

kafka:
image: docker.vectorized.io/vectorized/redpanda:v21.8.1
command:
- redpanda start
- --smp 1
- --memory 512M
- --overprovisioned
- --set redpanda.default_topic_replications=1
- --set redpanda.auto_create_topics_enabled=true
- --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
- --pandaproxy-addr 0.0.0.0:8089
- --advertise-pandaproxy-addr kafka:8089
hostname: kafka
ports:
- "8089:8089"
- "9092:9092"
- "9094:9094"

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

vertices-producer:
image: ververica/statefun-playground-producer:latest
depends_on:
- kafka
- statefun
environment:
APP_PATH: /mnt/vertices.txt
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: vertices
APP_JSON_PATH: vertex_id
volumes:
- ./vertices.txt:/mnt/vertices.txt
@@ -20,22 +20,12 @@ spec:
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
kind: io.statefun.playground.v1/ingress
spec:
id: connected-components.io/vertices
address: kafka:9092
consumerGroupId: connected-components
startupPosition:
type: earliest
topics:
- topic: vertices
valueType: connected-components.types/vertex
targets:
- connected-components.fns/vertex
port: 8090
---
kind: io.statefun.kafka.v1/egress
kind: io.statefun.playground.v1/egress
spec:
id: connected-components.io/connected-component-changes
address: kafka:9092
deliverySemantic:
type: at-least-once
port: 8091
topics:
- connected-component-changes
@@ -1,5 +1,12 @@
package org.apache.flink.statefun.playground.java.connectedcomponents;

import static org.apache.flink.statefun.playground.java.connectedcomponents.types.Types.EGRESS_RECORD_JSON_TYPE;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.EgressRecord;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.Types;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.Vertex;
import org.apache.flink.statefun.playground.java.connectedcomponents.types.VertexComponentChange;
@@ -8,127 +15,138 @@
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.apache.flink.statefun.sdk.java.message.MessageBuilder;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* A stateful function that computes the connected component for a stream of vertices.
*
* <p>The underlying algorithm is a form of label propagation and works by recording for every vertex its component id.
* Whenever a vertex is created or its component id changes, it will send this update to all of its neighbours.
* Every neighbour will compare the broadcast component id with its own id. If the id is lower than its own, then
* it will accept this component id and broadcast this change to its neighbours. If the own component id is smaller,
* then it answers to the broadcaster by sending its own component id.
* <p>The underlying algorithm is a form of label propagation and works by recording for every
* vertex its component id. Whenever a vertex is created or its component id changes, it will send
* this update to all of its neighbours. Every neighbour will compare the broadcast component id
* with its own id. If the id is lower than its own, then it will accept this component id and
* broadcast this change to its neighbours. If the own component id is smaller, then it answers to
* the broadcaster by sending its own component id.
*
* <p>That way, the minimum component id of each connected component will be broadcast throughout the whole
* connected component. Eventually, every vertex will have heard of the minimum component id and have accepted
* it.
* <p>That way, the minimum component id of each connected component will be broadcast throughout
* the whole connected component. Eventually, every vertex will have heard of the minimum component
* id and have accepted it.
*
* <p>Every component id change will be output to the {@link #KAFKA_EGRESS} as a connected component change.
* <p>Every component id change will be output to the {@link #PLAYGROUND_EGRESS} as a connected
* component change.
*
* @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation algorithm</a>
* @see <a href="https://en.wikipedia.org/wiki/Label_propagation_algorithm">Label propagation
* algorithm</a>
*/
final class ConnectedComponentsFn implements StatefulFunction {

/**
* The current component id of a vertex.
*/
private static final ValueSpec<Integer> COMPONENT_ID = ValueSpec.named("componentId").withIntType();
/** The current component id of a vertex. */
private static final ValueSpec<Integer> COMPONENT_ID =
ValueSpec.named("componentId").withIntType();

/**
* List of known neighbours of a vertex.
*/
private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE = ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);
/** List of known neighbours of a vertex. */
private static final ValueSpec<Set<Integer>> NEIGHBOURS_VALUE =
ValueSpec.named("neighbours").withCustomType(Types.NEIGHBOURS_TYPE);

static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
static final StatefulFunctionSpec SPEC = StatefulFunctionSpec.builder(TYPE_NAME)
.withSupplier(ConnectedComponentsFn::new)
.withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
.build();
static final TypeName TYPE_NAME = TypeName.typeNameOf("connected-components.fns", "vertex");
static final StatefulFunctionSpec SPEC =
StatefulFunctionSpec.builder(TYPE_NAME)
.withSupplier(ConnectedComponentsFn::new)
.withValueSpecs(COMPONENT_ID, NEIGHBOURS_VALUE)
.build();

static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("connected-components.io", "connected-component-changes");
static final TypeName PLAYGROUND_EGRESS = TypeName.typeNameOf("io.statefun.playground", "egress");

@Override
public CompletableFuture<Void> apply(Context context, Message message) {
// initialize a new vertex
if (message.is(Types.VERTEX_INIT_TYPE)) {
final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);
@Override
public CompletableFuture<Void> apply(Context context, Message message) {
// initialize a new vertex
if (message.is(Types.VERTEX_INIT_TYPE)) {
final Vertex vertex = message.as(Types.VERTEX_INIT_TYPE);

int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);
int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);

if (currentComponentId > vertex.getVertexId()) {
updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
currentComponentId = vertex.getVertexId();
}
if (currentComponentId > vertex.getVertexId()) {
updateComponentId(context, vertex.getVertexId(), vertex.getVertexId());
currentComponentId = vertex.getVertexId();
}

final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
neighbourDiff.removeAll(currentNeighbours);
final HashSet<Integer> neighbourDiff = new HashSet<>(vertex.getNeighbours());
neighbourDiff.removeAll(currentNeighbours);

broadcastVertexConnectedComponentChange(context, vertex.getVertexId(), neighbourDiff, currentComponentId);
broadcastVertexConnectedComponentChange(
context, vertex.getVertexId(), neighbourDiff, currentComponentId);

// update the neighbours
neighbourDiff.addAll(currentNeighbours);
context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
}
// a neighbours component id has changed
else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
final VertexComponentChange vertexComponentChange = message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);

// only process the message if we can reach the source --> connected components with directed edges
if (currentNeighbours.contains(vertexComponentChange.getSource())) {
final int componentIdCandidate = vertexComponentChange.getComponentId();
final int currentComponentId = context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);

if (currentComponentId < componentIdCandidate) {
sendVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), vertexComponentChange.getSource(), currentComponentId);
} else if (currentComponentId > componentIdCandidate) {
updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
currentNeighbours.remove(vertexComponentChange.getSource());
broadcastVertexConnectedComponentChange(context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
}
}
// update the neighbours
neighbourDiff.addAll(currentNeighbours);
context.storage().set(NEIGHBOURS_VALUE, neighbourDiff);
}
// a neighbours component id has changed
else if (message.is(Types.VERTEX_COMPONENT_CHANGE_TYPE)) {
final VertexComponentChange vertexComponentChange =
message.as(Types.VERTEX_COMPONENT_CHANGE_TYPE);
final Set<Integer> currentNeighbours = getCurrentNeighbours(context);

// only process the message if we can reach the source --> connected components with directed
// edges
if (currentNeighbours.contains(vertexComponentChange.getSource())) {
final int componentIdCandidate = vertexComponentChange.getComponentId();
final int currentComponentId =
context.storage().get(COMPONENT_ID).orElse(Integer.MAX_VALUE);

if (currentComponentId < componentIdCandidate) {
sendVertexConnectedComponentChange(
context,
vertexComponentChange.getTarget(),
vertexComponentChange.getSource(),
currentComponentId);
} else if (currentComponentId > componentIdCandidate) {
updateComponentId(context, vertexComponentChange.getTarget(), componentIdCandidate);
currentNeighbours.remove(vertexComponentChange.getSource());
broadcastVertexConnectedComponentChange(
context, vertexComponentChange.getTarget(), currentNeighbours, componentIdCandidate);
}

return context.done();
}
}

private Set<Integer> getCurrentNeighbours(Context context) {
return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
}
return context.done();
}

private void broadcastVertexConnectedComponentChange(Context context, int source, Iterable<Integer> neighbours, int componentId) {
for (Integer neighbour : neighbours) {
sendVertexConnectedComponentChange(context, source, neighbour, componentId);
}
}
private Set<Integer> getCurrentNeighbours(Context context) {
return context.storage().get(NEIGHBOURS_VALUE).orElse(Collections.emptySet());
}

private void sendVertexConnectedComponentChange(Context context, int source, int target, int currentComponentId) {
final VertexComponentChange vertexComponentChange = VertexComponentChange.create(source, target, currentComponentId);
context.send(MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
.withCustomType(
Types.VERTEX_COMPONENT_CHANGE_TYPE,
vertexComponentChange)
.build());
private void broadcastVertexConnectedComponentChange(
Context context, int source, Iterable<Integer> neighbours, int componentId) {
for (Integer neighbour : neighbours) {
sendVertexConnectedComponentChange(context, source, neighbour, componentId);
}
}

private void sendVertexConnectedComponentChange(
Context context, int source, int target, int currentComponentId) {
final VertexComponentChange vertexComponentChange =
VertexComponentChange.create(source, target, currentComponentId);
context.send(
MessageBuilder.forAddress(TYPE_NAME, String.valueOf(target))
.withCustomType(Types.VERTEX_COMPONENT_CHANGE_TYPE, vertexComponentChange)
.build());
}

private void updateComponentId(Context context, int vertexId, int componentId) {
context.storage().set(COMPONENT_ID, componentId);
outputConnectedComponentChange(context, vertexId, componentId);
}
private void updateComponentId(Context context, int vertexId, int componentId) {
context.storage().set(COMPONENT_ID, componentId);
outputConnectedComponentChange(context, vertexId, componentId);
}

private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
context.send(KafkaEgressMessage.forEgress(KAFKA_EGRESS)
.withTopic("connected-component-changes")
.withUtf8Key(String.valueOf(vertexId))
.withUtf8Value(String.format("Vertex %s belongs to component %s.", vertexId, componentId))
private void outputConnectedComponentChange(Context context, int vertexId, int componentId) {
context.send(
EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
.withCustomType(
EGRESS_RECORD_JSON_TYPE,
new EgressRecord(
"connected-component-changes",
String.format("Vertex %s belongs to component %s.", vertexId, componentId)))
.build());
}
}
}

0 comments on commit 7f68b7a

Please sign in to comment.