Skip to content
Permalink
Browse files
[FLINK-26158] Update go/greeter to use playground ingress/egress
  • Loading branch information
tillrohrmann committed Feb 16, 2022
1 parent 2815d55 commit 56e897f3d3a25ebef2fb34dc6f9b4dc50e073e76
Showing 5 changed files with 51 additions and 81 deletions.
@@ -5,43 +5,56 @@ This is a simple example of a stateful functions application implemented in `Go`
In this example, we imagine a service that computes personalized greetings.
Our service, consist out of the following components:

* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
to the `person` stateful function. Messages produced to this topic has the following
schema `{ "name" : "bob"}`.
* `playground ingress` - Ingestion point for messages. Messages are sent to the specified target function.

* `person` - This function is triggered by the ingress defined above.
This function keeps track of the number of visits, and triggers the next functions:

* `greeter` - This function, computes a personalized greeting, based on the name and the number
of visits of that user. The output of that computation is forward to a Kafka egress defined below.

* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.
* `playground egress` - Queryable endpoint that collects the emitted greetings in the `greetings` topic. The greeting is `utf-8` encoded.


![Flow](arch.png "Flow")

## Running the example

```
docker-compose build
docker-compose up
$ docker-compose build
$ docker-compose up
```

To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:
## Play around!

The greeter application allows you to do the following actions:

* Create a greeting for a user via sending a `GreetRequest` message to the `person` function

In order to send messages to the Stateful Functions application you can run:

```
docker-compose exec kafka rpk topic consume greetings
$ curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d '{"name": "Bob"}' localhost:8090/example/person/Bob
```

Try adding few more input lines to [input-example.json](input-example.json), and restart
the producer service.
To consume the customized greeting, as they appear in the `greetings` playground topic, run in a separate terminal:

```
docker-compose restart producer
$ curl -X GET localhost:8091/greetings
```

### Messages

The messages are expected to be encoded as JSON.

* `GreetRequest`: `{"name": "Bob"}`, `name` is the id of the `person` function

## What's next?

Feeling curious? add the following print to the `person` function at [greeter.go](greeter.go):
```fmt.Printf("Hello there %d!", ctx.Self().Id)```.
```
fmt.Printf("Hello there %d!", ctx.Self().Id)
```

Then, rebuild and restart only the `functions` service.

@@ -34,52 +34,12 @@ services:
###############################################################

statefun:
image: apache/flink-statefun-playground:3.2.0
image: apache/flink-statefun-playground:3.2.0-1.0
ports:
- "8081:8081"
- "8090:8090"
- "8091:8091"
depends_on:
- kafka
- functions
volumes:
- ./module.yaml:/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

kafka:
image: docker.vectorized.io/vectorized/redpanda:v21.8.1
command:
- redpanda start
- --smp 1
- --memory 512M
- --overprovisioned
- --set redpanda.default_topic_replications=1
- --set redpanda.auto_create_topics_enabled=true
- --kafka-addr INSIDE://0.0.0.0:9094,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr INSIDE://kafka:9094,OUTSIDE://kafka:9092
- --pandaproxy-addr 0.0.0.0:8089
- --advertise-pandaproxy-addr kafka:8089
hostname: kafka
ports:
- "8089:8089"
- "9092:9092"
- "9094:9094"

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

producer:
image: ververica/statefun-playground-producer:latest
depends_on:
- kafka
- statefun
environment:
APP_PATH: /mnt/input-example.json
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: names
APP_JSON_PATH: name
APP_DELAY_SECONDS: 1
volumes:
- ./input-example.json:/mnt/input-example.json
@@ -27,11 +27,17 @@ type GreetRequest struct {
Visits int32 `json:"visits"`
}

type EgressRecord struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
}

var (
PersonTypeName = statefun.TypeNameFrom("example/person")
GreeterTypeName = statefun.TypeNameFrom("example/greeter")
KafkaEgressTypeName = statefun.TypeNameFrom("example/greets")
PlaygroundEgressTypeName = statefun.TypeNameFrom("io.statefun.playground/egress")
GreetRequestType = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
EgressRecordType = statefun.MakeJsonType(statefun.TypeNameFrom("io.statefun.playground/EgressRecord"))
)

type Person struct {
@@ -78,11 +84,15 @@ func Greeter(ctx statefun.Context, message statefun.Message) error {

greeting := computeGreeting(request.Name, request.Visits)

ctx.SendEgress(statefun.KafkaEgressBuilder{
Target: KafkaEgressTypeName,
Topic: "greetings",
Key: request.Name,
Value: []byte(greeting),
egressRecord := EgressRecord {
Topic: "greetings",
Payload: greeting,
}

ctx.SendEgress(statefun.GenericEgressBuilder{
Target: PlaygroundEgressTypeName,
Value: egressRecord,
ValueType: EgressRecordType,
})

return nil

This file was deleted.

@@ -16,26 +16,15 @@ kind: io.statefun.endpoints.v2/http
spec:
functions: example/*
urlPathTemplate: http://functions:8000/statefun
transport:
transport:
type: io.statefun.transports.v1/async
---
kind: io.statefun.kafka.v1/ingress
kind: io.statefun.playground.v1/ingress
spec:
id: example/names
address: kafka:9092
consumerGroupId: my-group-id
startupPosition:
type: earliest
topics:
- topic: names
valueType: example/GreetRequest
targets:
- example/person
port: 8090
---
kind: io.statefun.kafka.v1/egress
kind: io.statefun.playground.v1/egress
spec:
id: example/greets
address: kafka:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
port: 8091
topics:
- greetings

0 comments on commit 56e897f

Please sign in to comment.