Skip to content
Permalink
Browse files
[FLINK-23951] Golang Greeter
  • Loading branch information
sjwiesman committed Aug 27, 2021
1 parent 7b78cff commit ac4a9c2e350672ce07f43f56a8ef1a41605c4d81
Showing 10 changed files with 399 additions and 0 deletions.
@@ -0,0 +1,3 @@
.idea
venv/
checkpoint-dir/
@@ -0,0 +1,34 @@

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM golang:1.16-alpine
RUN apk add --no-cache git

RUN mkdir -p /app
WORKDIR /app

COPY go.mod ./
COPY go.sum ./
RUN go mod download

COPY *.go ./

RUN go build -o /greeter

EXPOSE 8000

CMD [ "/greeter" ]
@@ -0,0 +1,55 @@
# The Greeter Example

This is a simple example of a stateful functions application implemented in `Go`.

In this example, we imagine a service that computes personalized greetings.
Our service, consist out of the following components:

* `kafka ingress` - This component forwards messages produced to the `names` kafka topic,
to the `person` stateful function. Messages produced to this topic has the following
schema `{ "name" : "bob"}`.

* `person` - This function is triggered by the ingress defined above.
This function keeps track of the number of visits, and triggers the next functions:

* `greeter` - This function, computes a personalized greeting, based on the name and the number
of visits of that user. The output of that computation is forward to a Kafka egress defined below.

* `kafka egress` - This wraps a Kafka producer that emits `utf-8` greetings to the `greetings` Kafka topic.


![Flow](arch.png "Flow")

## Running the example

```
docker-compose build
docker-compose up
```

To observe the customized greeting, as they appear in the `greetings` Kafka topic, run in a separate terminal:

```
docker-compose exec kafka kafka-console-consumer \
--bootstrap-server kafka:9092 \
--isolation-level read_committed \
--from-beginning \
--topic greetings
```

Try adding few more input lines to [input-example.json](input-example.json), and restart
the producer service.

```
docker-compose restart producer
```

Feeling curious? add the following print to the `person` function at [greeter.go](greeter.go):
```fmt.Printf("Hello there %d!", ctx.Self().Id)```.
Then, rebuild and restart only the `functions` service.
```
docker-compose build functions
docker-compose up functions
```
BIN +34.5 KB go/greeter/arch.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@@ -0,0 +1,101 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
version: "2.1"

services:

###############################################################
# Functions service
###############################################################

functions:
build:
context: ./
expose:
- "8000"

###############################################################
# StateFun runtime
###############################################################

statefun-manager:
image: apache/flink-statefun:3.1.0
expose:
- "6123"
ports:
- "8081:8081"
environment:
ROLE: master
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/greeter/module.yaml

statefun-worker:
image: apache/flink-statefun:3.1.0
expose:
- "6121"
- "6122"
depends_on:
- statefun-manager
- kafka
environment:
ROLE: worker
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/greeter/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

zookeeper:
image: confluentinc/cp-zookeeper:5.4.3
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:5.4.3
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

producer:
image: ververica/statefun-playground-producer:latest
depends_on:
- kafka
- statefun-worker
environment:
APP_PATH: /mnt/input-example.json
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: names
APP_JSON_PATH: name
APP_DELAY_SECONDS: 1
volumes:
- ./input-example.json:/mnt/input-example.json
@@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module statefun.io/greeter

go 1.16

require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0
@@ -0,0 +1,20 @@
github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4=
github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
@@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
"net/http"
)

type GreetRequest struct {
Name string `json:"name"`
Visits int32 `json:"visits"`
}

var (
PersonTypeName = statefun.TypeNameFrom("example/person")
GreeterTypeName = statefun.TypeNameFrom("example/greeter")
KafkaEgressTypeName = statefun.TypeNameFrom("example/greets")
GreetRequestType = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
)

type Person struct {
Visits statefun.ValueSpec
}

func (p *Person) Invoke(ctx statefun.Context, message statefun.Message) error {
// update the visit count.
var visits int32
ctx.Storage().Get(p.Visits, &visits)

visits += 1

fmt.Printf("seen %d", visits)
ctx.Storage().Set(p.Visits, visits)

// enrich the request with the number of visits.
var request GreetRequest
if err := message.As(GreetRequestType, &request); err != nil {
return fmt.Errorf("failed to deserialize greet reqeuest: %w", err)
}
request.Visits = visits

// next, we will forward a message to a special greeter function,
// that will compute a personalized greeting based on the number
// of visits that this person has been seen.
ctx.Send(statefun.MessageBuilder{
Target: statefun.Address{
FunctionType: GreeterTypeName,
Id: request.Name,
},
Value: request,
ValueType: GreetRequestType,
})

return nil
}

func Greeter(ctx statefun.Context, message statefun.Message) error {
var request GreetRequest
if err := message.As(GreetRequestType, &request); err != nil {
return fmt.Errorf("failed to deserialize greet reqeuest: %w", err)
}

greeting := computeGreeting(request.Name, request.Visits)

ctx.SendEgress(statefun.KafkaEgressBuilder{
Target: KafkaEgressTypeName,
Topic: "greetings",
Key: request.Name,
Value: []byte(greeting),
})

return nil
}

func computeGreeting(name string, visits int32) string {
templates := []string{"", "Welcome %s", "Nice to see you again %s", "Third time is the charm %s"}
if visits < int32(len(templates)) {
return fmt.Sprintf(templates[visits], name)
}

return fmt.Sprintf("Nice to see you for the %d-th time %s!", visits, name)
}

func main() {

builder := statefun.StatefulFunctionsBuilder()

person := &Person{
Visits: statefun.ValueSpec{
Name: "visits",
ValueType: statefun.Int32Type,
},
}
_ = builder.WithSpec(statefun.StatefulFunctionSpec{
FunctionType: PersonTypeName,
States: []statefun.ValueSpec{person.Visits},
Function: person,
})

_ = builder.WithSpec(statefun.StatefulFunctionSpec{
FunctionType: GreeterTypeName,
Function: statefun.StatefulFunctionPointer(Greeter),
})

http.Handle("/statefun", builder.AsHandler())
_ = http.ListenAndServe(":8000", nil)
}
@@ -0,0 +1,2 @@
{"name" : "Bob"}
{"name" : "Joe"}

0 comments on commit ac4a9c2

Please sign in to comment.