Skip to content
Permalink
Browse files
[FLINK-21826] Add Java Shopping Cart example.
This closes #2.
  • Loading branch information
igalshilman authored and tzulitai committed Mar 25, 2021
1 parent 4404121 commit 4503d9dade1b76f6754ca20961668c9a43c42e1d
Showing 14 changed files with 1,191 additions and 0 deletions.
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Build the functions code ...
FROM maven:3.6.3-jdk-11 AS builder
# TODO remove these commented lines and the jar; this is needed now only because we don't have the latest Java SDK published to Maven central yet
COPY statefun-sdk-java-3.0-SNAPSHOT.jar /usr/src/app/
RUN mvn install:install-file \
-Dfile=/usr/src/app/statefun-sdk-java-3.0-SNAPSHOT.jar \
-DgroupId=org.apache.flink \
-DartifactId=statefun-sdk-java \
-Dversion=3.0-SNAPSHOT \
-Dpackaging=jar \
-DgeneratePom=true
COPY pom.xml /usr/src/app/
# Build dependencies and cache this layer
RUN mvn -f /usr/src/app dependency:go-offline package -B
COPY src /usr/src/app/src
RUN mvn -f /usr/src/app/pom.xml clean package

# ... and run the web server!
FROM openjdk:8
WORKDIR /
COPY --from=builder /usr/src/app/target/shopping-cart*jar-with-dependencies.jar shopping-cart.jar
EXPOSE 1108
CMD java -jar shopping-cart.jar
@@ -0,0 +1,76 @@
# Shopping Cart Example with Docker Compose

This example demonstrates interaction between two stateful functions - one responsible for managing the users' shopping carts (`UserShoppingCartFn`), and the other responsible for managing the stock (`StockFn`). It is intended to showcase a somewhat more complex business logic where consistent state guarantees span multiple interacting stateful functions. You can think about them as two microservices that 'magically' always stay in consistent state with respect to each other and the output, without having to synchronize them or reconciliate their state in case of failures. This example uses an egress in exactly-once mode. This means that the receipt is produced to the output only if the internal fault-tolerate state of the functions got consistently updated according to the checkout request (requires `read_committed` consumer isolation level).

If you are new to stateful functions, we recommend you to first look at a more simple example, the [Greeter Example](../greeter).

## Directory structure

- `src/`, `pom.xml` and `Dockerfile`: These files and directories are the contents of a Java Maven project which builds
our functions service, hosting the `UserShoppingCartFn` and `StockFn` behind a HTTP endpoint. Check out the source code under
`src/main/java`. The `Dockerfile` is used to build a Docker image for our functions service.
- `module.yaml`: The [Module Specification]() file to be mounted to the StateFun runtime process containers. This
configures a few things for a StateFun application, such as the service endpoints of the application's functions, as
well as definitions of [Ingresses and Egresses]() which the application will use.
- `docker-compose.yml`: Docker Compose file to spin up everything.
- `playthrough`: utilities for automatically playing through the interactions scenarios.

## Prerequisites

- Docker
- Docker Compose

## Running the example

This example works with Docker Compose, and runs a few services that build up an end-to-end StateFun application:
- Functions service that runs your functions and expose them through an HTTP endpoint.
- StateFun runtime processes (a manager plus workers) that will handle ingress, egress, and inter-function messages as
well as function state storage in a consistent and fault-tolerant manner.
- Apache Kafka broker for the application ingress and egress. StateFun currently natively supports AWS Kinesis as well,
and you can also extend to connect with other systems.

To build the example, execute:

```
cd java/shopping-cart
docker-compose build
```

This pulls all the necessary Docker images (StateFun and Kafka), and also builds the functions service image. This can
take a few minutes as it also needs to build the function's Java project.

Afterward the build completes, start running all the services:

```
docker-compose up
```

## Play around!

The `playground` folder contains scenario(s) and utilities which allow you to easily execute a set of steps that emulate interactions with the stateful functions.

In order to run a scenario, execute:
```
cd java/shopping-cart/playthrough
./scenario_1.sh
```

It will send a series of messages, results of which you can observe in the logs of the `shopping-cart-functions` component:
```
docker-compose logs -f shopping-cart-functions
```
Note: `Caller: Optional.empty` in the logs corresponds to the messages that came via an ingress rather than from another stateful function.

To see the results produced to the egress:
```
docker-compose exec kafka bash -c '/usr/bin/kafka-console-consumer --topic receipts --bootstrap-server kafka:9092'
```

If you want to modify the code, you can do a hot redeploy of your functions service:
```
docker-compose up -d --build shopping-cart-functions
```
This rebuilds the functions service image with the updated code, and restarts the service with the new image.



@@ -0,0 +1,91 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
version: "2.1"

services:

###############################################################
# Functions service
###############################################################

shopping-cart-functions:
build:
dockerfile: Dockerfile
context: .
expose:
- "1108"

###############################################################
# StateFun runtime
###############################################################

statefun-manager:
image: flink-statefun:3.0-SNAPSHOT
expose:
- "6123"
ports:
- "8081:8081"
environment:
ROLE: master
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/shopping-cart/module.yaml

statefun-worker:
image: flink-statefun:3.0-SNAPSHOT
expose:
- "6121"
- "6122"
depends_on:
- statefun-manager
- kafka
- shopping-cart-functions
links:
- "statefun-manager:statefun-manager"
- "kafka:kafka"
- "shopping-cart-functions:shopping-cart-functions"
environment:
ROLE: worker
MASTER_HOST: statefun-manager
volumes:
- ./module.yaml:/opt/statefun/modules/shopping-cart/module.yaml

###############################################################
# Kafka for ingress and egress
###############################################################

zookeeper:
image: confluentinc/cp-zookeeper:5.4.3
environment:
ZOOKEEPER_CLIENT_PORT: "2181"
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:5.4.3
ports:
- "9092:9092"
depends_on:
- zookeeper
links:
- "zookeeper:zookeeper"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: "3.0"
module:
meta:
type: remote
spec:
endpoints:
- endpoint:
meta:
kind: http
spec:
functions: com.example/*
urlPathTemplate: http://shopping-cart-functions:1108/
ingresses:
# user-shopping-cart:
# TODO: add RemoveFromCart
- ingress:
meta:
type: io.statefun.kafka/ingress
id: example.com/add-to-cart
spec:
address: kafka:9092
consumerGroupId: my-group-id
topics:
- topic: add-to-cart
valueType: com.example/AddToCart
targets:
- com.example/user-shopping-cart
- ingress:
meta:
type: io.statefun.kafka/ingress
id: example.com/clear-cart
spec:
address: kafka:9092
consumerGroupId: my-group-id
topics:
- topic: clear-cart
valueType: com.example/ClearCart
targets:
- com.example/user-shopping-cart
- ingress:
meta:
type: io.statefun.kafka/ingress
id: example.com/checkout
spec:
address: kafka:9092
consumerGroupId: my-group-id
topics:
- topic: checkout
valueType: com.example/Checkout
targets:
- com.example/user-shopping-cart
- ingress:
meta:
type: io.statefun.kafka/ingress
id: com.example/restock-items
spec:
address: kafka:9092
consumerGroupId: my-group-id
topics:
- topic: restock-items
valueType: com.example/RestockItem
targets:
- com.example/stock
egresses:
- egress:
meta:
type: io.statefun.kafka/egress
id: com.example/receipts
spec:
address: kafka:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
@@ -0,0 +1,37 @@
#!/bin/bash

source $(dirname "$0")/utils.sh

######## Scenario 1:
# 1) add socks to stock (via StockFn)
# 2) put socks for userId "1" into the shopping cart (via UserShoppingCartFn)
# 3) checkout (via UserShoppingCartFn)
#--------------------------------
# 1)
key="socks" # itemId
json=$(cat <<JSON
{"itemId":"socks","quantity":50}
JSON
)
ingress_topic="restock-items" # StockFn
send_to_kafka $key $json $ingress_topic
sleep 1
#--------------------------------
# 2)
key="1" # userId
json=$(cat <<JSON
{"userId":"1","quantity":3,"itemId":"socks"}
JSON
)
ingress_topic="add-to-cart" # UserShoppingCartFn
send_to_kafka $key $json $ingress_topic
sleep 1
#--------------------------------
# 3)
key="1" # userId
json=$(cat <<JSON
{"userId":"1"}
JSON
)
ingress_topic="checkout" # UserShoppingCartFn
send_to_kafka $key $json $ingress_topic
@@ -0,0 +1,14 @@
#!/bin/bash

# Sends messages to Kafka within docker-compose setup.
# Parameters:
# - param1: message key
# - param2: message payload
# - param3: Kafka topic
send_to_kafka () {
local key=$1
local payload=$2
local topic=$3
echo "Sending \"$payload\" with key \"$key\" to \"$topic\" topic"
docker-compose exec kafka bash -c "echo '$key: $payload' | /usr/bin/kafka-console-producer --topic $topic --broker-list kafka:9092 --property 'parse.key=true' --property 'key.separator=:'"
}

0 comments on commit 4503d9d

Please sign in to comment.