Skip to content
Permalink
Browse files
[FLINK-26158] Update java/greeter example to use playground ingress/e…
…gress
  • Loading branch information
tillrohrmann committed Feb 16, 2022
1 parent c5335e8 commit fdb0e787c7b6e7d547fa279e076cc992836500fd
Showing 7 changed files with 73 additions and 147 deletions.
@@ -9,8 +9,6 @@ This example works with Docker Compose, and runs a few services that build up an
- Functions service that runs your functions and expose them through an HTTP endpoint.
- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
well as function state storage in a consistent and fault-tolerant manner.
- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
and you can also extend to connect with other systems.

To motivate this example, we'll implement a simple user greeter application, which has two functions - a `UserFn` that
expects `UserLogin` JSON events from an ingress and keeps in state storage information about users, and a `GreetingsFn`
@@ -21,7 +19,6 @@ that accepts user information to generate personalized greeting messages that ar
- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
our functions service, hosting the `UserFn` and `UserLogin` behind a HTTP endpoint. Check out the source code under
`src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
- `user-logins.txt`: A file with multiple JSON objects per line; this is used as test events produced to our application ingress.
- `module.yaml`: The [Module Specification](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/deployment/module/) file to be mounted to the StateFun runtime process containers. This
configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
well as definitions of [Ingresses and Egresses](https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.2/docs/io-module/overview/) which the application will use.
@@ -40,7 +37,7 @@ First, lets build the example. From this directory, execute:
$ docker-compose build
```

This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
This pulls all the necessary Statefun Docker image, and also builds the functions service image. This can
take a few minutes as it also needs to build the function's Java project.

Afterward the build completes, start running all the services:
@@ -51,12 +48,30 @@ $ docker-compose up

## Play around!

You can take a look at what messages are being sent to the Kafka egress:
The greeter application allows you to do the following actions:

* Create a greeting for a user via sending a `UserLogin` message to the `user` function

In order to send messages to the Stateful Functions application you can run:

```
$ curl -X PUT -H "Content-Type: application/vnd.greeter.types/UserLogin" -d '{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}' localhost:8090/greeter.fns/user/1
```

You can take a look at what messages are being sent to the Playground egress:

```
$ docker-compose exec kafka rpk topic consume greetings
$ curl -X GET localhost:8091/greetings
```

### Messages

The messages are expected to be encoded as JSON.

* `UserLogin`: `{"user_id": "1", "user_name": "Joe", "login_type": "WEB"}`, `user_id` is the id of the `user` function

## What's next?

You can also try modifying the function code in the `src/main/java` directory, and do a zero-downtime upgrade of the
functions. Some ideas you can try out:
- Add some more state to be persisted by the `UserFn`. For example, let it additionally keep track of the user's previous login location.
@@ -35,51 +35,12 @@ services:
###############################################################

statefun:
image: apache/flink-statefun-playground:3.2.0
image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
- "8090:8090"
- "8091:8091"
depends_on:
- kafka
- greeter-functions
volumes:
- ./module.yaml:/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

kafka:
image: docker.vectorized.io/vectorized/redpanda:v21.8.1
command:
- redpanda start
- --smp 1
- --memory 512M
- --overprovisioned
- --set redpanda.default_topic_replications=1
- --set redpanda.auto_create_topics_enabled=true
- --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
- --pandaproxy-addr 0.0.0.0:8089
- --advertise-pandaproxy-addr kafka:8089
hostname: kafka
ports:
- "8089:8089"
- "9092:9092"
- "9094:9094"

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

user-logins-producer:
image: ververica/statefun-playground-producer:latest
depends_on:
- kafka
- statefun
environment:
APP_PATH: /mnt/user-logins.txt
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: user-logins
APP_JSON_PATH: user_id
volumes:
- ./user-logins.txt:/mnt/user-logins.txt
@@ -20,22 +20,12 @@ spec:
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
kind: io.statefun.playground.v1/ingress
spec:
id: greeter.io/user-logins
address: kafka:9092
consumerGroupId: greeter
startupPosition:
type: earliest
topics:
- topic: user-logins
valueType: greeter.types/org.apache.flink.statefun.playground.java.greeter.types.UserLogin
targets:
- greeter.fns/user
port: 8090
---
kind: io.statefun.kafka.v1/egress
kind: io.statefun.playground.v1/egress
spec:
id: greeter.io/user-greetings
address: kafka:9092
deliverySemantic:
type: at-least-once
port: 8091
topics:
- greetings
@@ -18,44 +18,44 @@

package org.apache.flink.statefun.playground.java.greeter;

import static org.apache.flink.statefun.playground.java.greeter.types.Types.EGRESS_RECORD_JSON_TYPE;
import static org.apache.flink.statefun.playground.java.greeter.types.Types.USER_PROFILE_PROTOBUF_TYPE;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.playground.java.greeter.types.EgressRecord;
import org.apache.flink.statefun.playground.java.greeter.types.generated.UserProfile;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.io.KafkaEgressMessage;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;

/**
* A simple function that computes personalized greetings messages based on a given {@link
* UserProfile}. Then, it sends the greetings message back to the user via an egress Kafka topic.
*/
final class GreetingsFn implements StatefulFunction {

private static final String[] GREETINGS_TEMPLATES =
new String[] {"Welcome %s!", "Nice to see you again %s.", "Third time is a charm %s!"};

static final TypeName TYPENAME = TypeName.typeNameOf("greeter.fns", "greetings");
static final StatefulFunctionSpec SPEC =
StatefulFunctionSpec.builder(TYPENAME).withSupplier(GreetingsFn::new).build();

private static final TypeName KAFKA_EGRESS = TypeName.typeNameOf("greeter.io", "user-greetings");
private static final TypeName PLAYGROUND_EGRESS =
TypeName.typeNameOf("io.statefun.playground", "egress");

@Override
public CompletableFuture<Void> apply(Context context, Message message) {
if (message.is(USER_PROFILE_PROTOBUF_TYPE)) {
final UserProfile profile = message.as(USER_PROFILE_PROTOBUF_TYPE);
final String greetings = createGreetingsMessage(profile);
final EgressRecord egressRecord = new EgressRecord("greetings", greetings);

final String userId = context.self().id();
context.send(
KafkaEgressMessage.forEgress(KAFKA_EGRESS)
.withTopic("greetings")
.withUtf8Key(userId)
.withUtf8Value(greetings)
EgressMessageBuilder.forEgress(PLAYGROUND_EGRESS)
.withCustomType(EGRESS_RECORD_JSON_TYPE, egressRecord)
.build());
}
return context.done();
@@ -0,0 +1,28 @@
package org.apache.flink.statefun.playground.java.greeter.types;

import com.fasterxml.jackson.annotation.JsonProperty;

public class EgressRecord {
@JsonProperty("topic")
private String topic;

@JsonProperty("payload")
private String payload;

public EgressRecord() {
this(null, null);
}

public EgressRecord(String topic, String payload) {
this.topic = topic;
this.payload = payload;
}

public String getTopic() {
return topic;
}

public String getPayload() {
return payload;
}
}
@@ -15,7 +15,7 @@ private Types() {}

public static final Type<UserLogin> USER_LOGIN_JSON_TYPE =
SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameOf(TYPES_NAMESPACE, UserLogin.class.getName()),
TypeName.typeNameOf(TYPES_NAMESPACE, "UserLogin"),
JSON_OBJ_MAPPER::writeValueAsBytes,
bytes -> JSON_OBJ_MAPPER.readValue(bytes, UserLogin.class));

@@ -24,4 +24,10 @@ private Types() {}
TypeName.typeNameOf(TYPES_NAMESPACE, UserProfile.getDescriptor().getFullName()),
UserProfile::toByteArray,
UserProfile::parseFrom);

public static final Type<EgressRecord> EGRESS_RECORD_JSON_TYPE =
SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameOf("io.statefun.playground", "EgressRecord"),
JSON_OBJ_MAPPER::writeValueAsBytes,
bytes -> JSON_OBJ_MAPPER.readValue(bytes, EgressRecord.class));
}

This file was deleted.

0 comments on commit fdb0e78

Please sign in to comment.