Skip to content
Permalink
Browse files
[FLINK-25899] Add connected components example
  • Loading branch information
tillrohrmann committed Feb 2, 2022
1 parent 66cbd1f commit dc40776d380540094fa721bfe734c1ca926be9f9
Showing 12 changed files with 725 additions and 0 deletions.
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build the functions code ...
FROM maven:3.6.3-jdk-11 AS builder
COPY src /usr/src/app/src
COPY pom.xml /usr/src/app
RUN mvn -f /usr/src/app/pom.xml clean package

# ... and run the web server!
FROM openjdk:8
WORKDIR /
COPY --from=builder /usr/src/app/target/connected-components-functions-app*jar-with-dependencies.jar connected-components-functions-app.jar
EXPOSE 1108
CMD java -jar connected-components-functions-app.jar
@@ -0,0 +1,74 @@
# Connected Components Example with Docker Compose

This example is intended as a follow-up after completion of the [Java SDK Showcase Tutorial](../showcase). If you're
already familiar with the Java SDK fundamentals and would like to get a better understanding of how a realistic StateFun
application looks like, then you're in the right place! Otherwise, we highly suggest taking a look at the Showcase
tutorial first.

This example works with Docker Compose, and runs a few services that build up an end-to-end StateFun application:
- Functions service that runs your functions and expose them through an HTTP endpoint.
- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
well as function state storage in a consistent and fault-tolerant manner.
- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
and you can also extend to connect with other systems.

To motivate this example, we'll implement a [connected components](https://en.wikipedia.org/wiki/Component_(graph_theory) algorithm on top of Stateful Functions.
The program has one function - a `ConnectedComponentsFn` that consumes `Vertex` JSON events from an ingress and communicates with its neighbours to find the minimal component id.
Changes of the component id of a vertex are being output via an egress.

## Directory structure

- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
our functions service, hosting the `ConnectedComponentsFn` behind a HTTP endpoint. Check out the source code under
`src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
- `vertices.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/io-module/overview/) which the application will use.
- `docker-compose.yml`: Docker Compose file to spin up everything.

## Prerequisites

- Docker
- Docker Compose

## Running the example

First, lets build the example. From this directory, execute:

```
$ docker-compose build
```

This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
take a few minutes as it also needs to build the function's Java project.

Afterward the build completes, start running all the services:

```
$ docker-compose up
```

## Play around!

You can take a look at what messages are being sent to the Kafka egress:

```
$ docker-compose exec kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic connected-component-changes \
--from-beginning
```

You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
functions. Some ideas you can try out:
- Enable the connected component computation for graphs with undirected edges
- Make the neighbour set changeable

After you've finished changing the function code, you can do a hot redeploy of your functions service:

```
$ docker-compose up -d --build connected-components-functions
```

This rebuilds the functions service image with the updated code, and restarts the service with the new image.
@@ -0,0 +1,110 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
version: "2.1"

services:

###############################################################
# Functions service
###############################################################

connected-components-functions:
build:
dockerfile: Dockerfile
context: .
expose:
- "1108"

###############################################################
# StateFun runtime
###############################################################

statefun-manager:
image: apache/flink-statefun:3.2.0-java11
expose:
- "6123"
ports:
- "8081:8081"
environment:
ROLE: master
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/connected-components/module.yaml

statefun-worker:
image: apache/flink-statefun:3.2.0-java11
expose:
- "6121"
- "6122"
depends_on:
- statefun-manager
- kafka
- connected-components-functions
links:
- "statefun-manager:statefun-manager"
- "kafka:kafka"
- "connected-components-functions:connected-components-functions"
environment:
ROLE: worker
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/connected-components/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

zookeeper:
image: confluentinc/cp-zookeeper:5.4.3
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:5.4.3
ports:
- "9092:9092"
depends_on:
- zookeeper
links:
- "zookeeper:zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

vertices-producer:
image: ververica/statefun-playground-producer:latest
depends_on:
- kafka
- statefun-worker
links:
- "kafka:kafka"
environment:
APP_PATH: /mnt/vertices.txt
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: vertices
APP_JSON_PATH: vertex_id
volumes:
- ./vertices.txt:/mnt/vertices.txt
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

kind: io.statefun.endpoints.v2/http
spec:
functions: connected-components.fns/*
urlPathTemplate: http://connected-components-functions:1108/
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
spec:
id: connected-components.io/vertices
address: kafka:9092
consumerGroupId: connected-components
startupPosition:
type: earliest
topics:
- topic: vertices
valueType: connected-components.types/vertex
targets:
- connected-components.fns/vertex
---
kind: io.statefun.kafka.v1/egress
spec:
id: connected-components.io/connected-component-changes
address: kafka:9092
deliverySemantic:
type: at-least-once
@@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.flink</groupId>
<artifactId>connected-components-functions-app</artifactId>
<version>3.2.0</version>
<packaging>jar</packaging>

<properties>
<statefun.version>3.2.0</statefun.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<!-- StateFun Java SDK -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-java</artifactId>
<version>${statefun.version}</version>
</dependency>

<!-- For custom type serialization (JSON) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.2</version>
</dependency>

<!-- Undertow web server -->
<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-core</artifactId>
<version>1.4.18.Final</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Build a fat executable jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.apache.flink.statefun.playground.java.connectedcomponents.ConnectedComponentsAppServer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Java code style -->
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>1.20.0</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.7</version>
<style>GOOGLE</style>
</googleJavaFormat>
<removeUnusedImports/>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<phase>verify</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

0 comments on commit dc40776

Please sign in to comment.