Skip to content
Permalink
Browse files
[FLINK-23951] Golang Showcase
This closes #12
  • Loading branch information
sjwiesman committed Aug 27, 2021
1 parent ac4a9c2 commit f09617426baf77e9d8354fdb779f7ccead524bf0
Showing 16 changed files with 1,065 additions and 0 deletions.
@@ -0,0 +1,64 @@
# StateFun GoLang SDK Showcase

This project is intended for new StateFun users that would like to start implementing their StateFun application functions using GoLang.
The tutorial is streamlined and split into a few parts which we recommend to go through a specific order, as lay out below.
Each part is demonstrated with some code snippets plus comments to guide you through the SDK fundamentals.

## Prerequisites

- golang
- docker-compose

## Tutorial Sections

### Type System

This function demonstrates StateFun's type system using the GoLang SDK.


### Messaging Primitives

[This function](pkg/showcase/part1/types.go) demonstrates how to send messages to other functions.

### Sending messages to egresses

To let your StateFun application interact with the outside world, functions may write messages
to egresses. [This function](pkg/showcase/part3/egress.go) demonstrates sending messages to an Apache Kafka or AWS Kinesis
egress, which is currently our most commonly used egresses that are natively supported by
StateFun.

### Function state storage

Consistent state is at the core of stateful functions. [This function](pkg/showcase/part4/storage.go)
demonstrates interacting with function state.

### Asynchronous operations

[This function](pkg/showcase/part5/asyncops.go) demonstrates performing asynchronous operations during a function invocation. It
is a common scenario for functions to have external dependencies in order for it to complete its
work, such as fetching enrichment information from remote databases.

### Serving

[This function](pkg/showcase/part6/serving.go) builds a full stateful functions application
and shows how they are exposed and deployed in the real world. Run this function locally
along with the stateful functions runtime!

To actually start serving run from one terminal:
```bash
$ cd pkg/showcase/part6
$ go build
$ ./part6
```

And from another:
```docker-compose up```
# Next Steps
The setup you executed in the last part of this tutorial is not how you'd normally deploy StateFun processes
and functions. It's a rather simplified setup to allow you to explore the interaction between
functions and the StateFun processes by setting debugger breakpoints in the function code in your IDE.
We recommend now to take a look at a slightly more realistic setup, using Docker Compose, in the
[Greeter Docker Compose Example](../greeter).
@@ -0,0 +1,97 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
version: "2.1"

services:

###############################################################
# StateFun runtime
###############################################################

statefun-manager:
image: apache/flink-statefun:3.1.0
expose:
- "6123"
ports:
- "8081:8081"
environment:
ROLE: master
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/greeter/module.yaml

statefun-worker:
image: apache/flink-statefun:3.1.0
expose:
- "6121"
- "6122"
environment:
ROLE: worker
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/greeter/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

zookeeper:
image: confluentinc/cp-zookeeper:5.4.3
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:5.4.3
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

###############################################################
# Forward a port 8000 from the host's machine
###############################################################

host-machine:
image: qoomon/docker-host@sha256:e0f021dd77c7c26d37b825ab2cbf73cd0a77ca993417da80a14192cb041937b0
cap_add: [ 'NET_ADMIN', 'NET_RAW' ]
mem_limit: 8M
restart: on-failure
environment:
PORTS: 8000

###############################################################
# Simple Kafka JSON producer to simulate ingress events
###############################################################

producer:
image: ververica/statefun-playground-producer:latest
environment:
APP_PATH: /mnt/input-example.json
APP_KAFKA_HOST: kafka:9092
APP_KAFKA_TOPIC: logins
APP_JSON_PATH: user_id
APP_DELAY_SECONDS: 1
volumes:
- ./input-example.json:/mnt/input-example.json
@@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module statefun.io/showcase

go 1.16

require github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0
@@ -0,0 +1,20 @@
github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0 h1:uE56xfgn4c/ytXppcW/NQUNxtPM8NpvkbU/VFMuaXN4=
github.com/apache/flink-statefun/statefun-sdk-go/v3 v3.1.0/go.mod h1:uHiPJsi71a161NMH/ISkkSPIXenkcG9A2m+uhT8UlJ4=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
@@ -0,0 +1,2 @@
{"user_id" : "bid", "user_name": "Bob", "login_type": "web"}
{"user_id" : "jid", "user_name": "Joe", "login_type": "mobile"}
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

kind: io.statefun.endpoints.v2/http
spec:
functions: showcase.fns/*
urlPathTemplate: http://host-machine:8000/statefun
maxNumBatchRequests: 10000
---
kind: io.statefun.kafka.v1/ingress
spec:
id: showcase.io/names
address: kafka:9092
consumerGroupId: my-group-id
startupPosition:
type: earliest
topics:
- topic: logins
valueType: showcase.types/userlogin
targets:
- showcase.fns/user
---
kind: io.statefun.kafka.v1/egress
spec:
id: showcase.io/greets
address: kafka:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package part1

import (
"fmt"
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
)

// Types
// Showcase Part 1: Type System
// ============================
// This function demonstrates StateFun's type system using the GoLang SDK.
//
// Core Type abstraction
// =====================
// The core abstraction used by StateFun's type system is the Type interface, which
// consists of a few things that StateFun uses to handle messages and state values:
//
// A TypeName to identify the type.
// A TypeSerializer for serializing and deserializing instances of the type.
//
// Cross-language primitive types
// ==============================
// StateFun's type system has cross-language support for common primitive types, such as boolean,
// integer, long, etc. These primitive types have built-in Types implemented for them
// already, with predefined typenames.
//
// This is of course all transparent for the user, so you don't need to worry about it. Functions
// implemented in various languages (e.g. Java or Python) can message each other by directly sending
// supported primitive values as message arguments. Moreover, the type system is used for state
// values as well; so, you can expect that a function can safely read previous state after
// reimplementing it in a different language. We'll cover more on state storage access in later
// parts of the showcase series.
//
// Common custom types (e.g. JSON or Protobuf)
// ===========================================
// The type system is also very easily extensible to support custom message types, such as JSON
// or Protobuf messages. This is just a matter of implementing your own Type with a custom
// typename and serializer.
//
// StateFun makes this super easy by providing builder utilities to help you create a simple
// Type. Take a look at showcase_custom_types.py for few recommended ways to quickly create a StateFun Type
// for your JSON or Protobuf messages.
func Types(_ statefun.Context, message statefun.Message) error {
// All values, including messages and storage values, are handled via StateFun's type system.
// StateFun ships built-in primitive types that handles de-/serialization of messages across
// functions:
if message.IsBool() {
fmt.Printf("I've got a message with a boolean %v", message.AsBool())
} else if message.IsInt32() {
fmt.Printf("I've got a message with an int32 %v", message.AsInt32())
} else if message.IsInt64() {
fmt.Printf("I've got a message with an int64 %v", message.AsInt64())
} else if message.IsFloat32() {
fmt.Printf("I've got a message with a float32 %v", message.AsFloat32())
} else if message.IsFloat64() {
fmt.Printf("I've got a message with a float64 %v", message.AsFloat64())
} else if message.IsString() {
fmt.Printf("I've got a message with a string %v", message.AsString())
} else if message.Is(UserLoginType) {
var login UserLogin
if err := message.As(UserLoginType, &login); err != nil {
return fmt.Errorf("failed to deserialize user login: %w", err)
}
fmt.Printf("I've got a message with a login event %s", login)
}

return nil
}

0 comments on commit f096174

Please sign in to comment.