Skip to content

Commit

Permalink
Added exactly-once delivery example (#228)
Browse files Browse the repository at this point in the history
* Added exactly-once delivery example

* added README for example

* readme cosmetics

* readme cosmetics

* readme cosmetics

* Update README.md

* update deps

* README cosmetics

Co-authored-by: Miłosz Smółka <milosz.smolka@gmail.com>
  • Loading branch information
roblaszczak and m110 committed Apr 5, 2021
1 parent eed1a09 commit 6b8695d
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 0 deletions.
@@ -0,0 +1,44 @@
# Exactly-once delivery counter

Is exactly-once delivery impossible? Well, it depends a lot on the definition of exactly-once delivery.
When we assume we want to avoid the situation when a message is delivered more than once when our broker or worker died -- it's possible.
I'll say more, it's even possible with Watermill!

![](./at-least-once-delivery.jpg)

*At-least once delivery - this is not what we want!*

There are just two constraints:
1. you need to use a Pub/Sub implementation that does support exactly-once delivery (only [MySQL/PostgreSQL](https://github.com/ThreeDotsLabs/watermill-sql) for now),
2. writes need to go to the same DB.

In practice, our model is pretty similar to how does it work with Kafka exactly-once delivery. If you want to know more details, you can check [their article](https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/).

In our example, we use a MySQL database to implement a **simple counter**. It can be triggered by calling the `http://localhost:8080/count/{counterUUID}` endpoint.
Calling this endpoint will publish a message to MySQL via our [Pub/Sub implementation](https://github.com/ThreeDotsLabs/watermill-sql).
The endpoint is provided by [server/main.go](server/main.go).

Later, the message is consumed by [worker/main.go](worker/main.go). The only responsibility of the worker is to update the counter in the MySQL database.
**Counter update is done in the same transaction as message consumption.**

Normally, we would need to de-duplicate messages.
But thanks to that fact and [A.C.I.D](https://en.wikipedia.org/wiki/ACID) even if server, worker or network failure happens during processing our data will stay consistent.

![](./architecture.jpg)

*Watermill's exactly-once delivery*

To check if the created code works, I've created a small `run.go` program, that sends 10k requests to the server and verifies if the count at the end is equal to 10k.
But to not make it too easy, I'm restarting the worker and MySQL a couple of times. I also forgot about graceful shutdown in my worker. ;-)

The biggest downside of this approach is performance. Due to [our benchmark](https://github.com/ThreeDotsLabs/watermill-benchmark#sql-mysql), MySQL subscriber can consume up to 154 messages per second.
Fortunately, it's still 13,305,600 messages per day. It's more than enough for a lot of systems.

## Running

docker-compose up

go run run.go

*Please note that `run.go` needs to be executed by a user having privileges to manage Docker.
It's due to the fact that `run.go` is restarting containers.*
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
@@ -0,0 +1,32 @@
version: '3'
services:
server:
image: golang:1.15
restart: unless-stopped
ports:
- 8080:8080
volumes:
- ./server:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
command: 'go run .'

worker:
image: golang:1.15
restart: unless-stopped
volumes:
- ./worker:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
command: 'go run .'

mysql:
image: mysql:8.0
restart: unless-stopped
ports:
- 3306:3306
environment:
MYSQL_DATABASE: example
MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
volumes:
- ./schema.sql:/docker-entrypoint-initdb.d/schema.sql
169 changes: 169 additions & 0 deletions _examples/real-world-examples/exactly-once-delivery-counter/run.go
@@ -0,0 +1,169 @@
package main

import (
stdSQL "database/sql"
"fmt"
"net/http"
"os/exec"
"sync"
"time"

"github.com/cheggaaa/pb/v3"
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
)

const messagesCount = 5000

// at these messages we will restart MySQL
var restartMySQLAt = map[int]struct{}{
50: {},
1000: {},
1500: {},
3000: {},
}

// at these messages we will restart counter worker
var restartWorkerAt = map[int]struct{}{
100: {},
1500: {},
1600: {},
3000: {},
}

const senderGoroutines = 5

func main() {
db := createDB()
counterUUID := uuid.New().String()

wg := &sync.WaitGroup{}
wg.Add(messagesCount)

bar := pb.StartNew(messagesCount)

// sending value to sendCounter counter HTTP call
sendCounter := make(chan struct{}, 0)
go func() {
for i := 0; i < messagesCount; i++ {
sendCounter <- struct{}{}

// let's challenge exactly-once delivery a bit
// normally it should trigger re-delivery of the message
if _, ok := restartMySQLAt[i]; ok {
restartMySQL()
}
if _, ok := restartWorkerAt[i]; ok {
restartWorker()
}
}
close(sendCounter)
}()

for i := 0; i < senderGoroutines; i++ {
go func() {
for range sendCounter {
sendCountRequest(counterUUID)
wg.Done()
bar.Increment()
}
}()
}

wg.Wait()
bar.Finish()

timeout := time.Now().Add(time.Second * 30)

fmt.Println("checking counter with DB, expected count:", messagesCount)

matchedOnce := true

for {
if time.Now().After(timeout) {
fmt.Println("timeout")
break
}

dbCounterValue, err := getDbCounterValue(db, counterUUID)
if err != nil {
fmt.Println("err:", err)
continue
}

fmt.Println("db counter value", dbCounterValue)
if dbCounterValue == messagesCount {
if !matchedOnce {
// let's ensure that nothing new will arrive
matchedOnce = true
time.Sleep(time.Second * 2)
continue
} else {
fmt.Println("expected counter value is matching DB value")
break
}
}

time.Sleep(time.Second)
}
}

func getDbCounterValue(db *stdSQL.DB, counterUUID string) (int, error) {
var dbCounterValue int
row := db.QueryRow("SELECT value from counter WHERE id = ?", counterUUID)

if err := row.Scan(&dbCounterValue); err != nil {
return 0, err
}

return dbCounterValue, nil
}

func restartWorker() {
fmt.Println("restarting worker")
err := exec.Command("docker-compose", "restart", "worker").Run()
if err != nil {
fmt.Println("restarting worker failed", err)
}
}

func restartMySQL() {
fmt.Println("restarting mysql")
err := exec.Command("docker-compose", "restart", "mysql").Run()
if err != nil {
fmt.Println("restarting mysql failed", err)
}
}

func sendCountRequest(counterUUID string) {
for {
resp, err := http.Post("http://localhost:8080/count/"+counterUUID, "", nil)
if err != nil {
continue
}

if resp.StatusCode == http.StatusNoContent {
break
}
}
}

func createDB() *stdSQL.DB {
conf := mysql.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "localhost"
conf.DBName = "example"

db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}

err = db.Ping()
if err != nil {
panic(err)
}

return db
}
@@ -0,0 +1,4 @@
CREATE TABLE counter (
id VARCHAR(36) NOT NULL UNIQUE,
value int NOT NULL
);
@@ -0,0 +1,10 @@
module exactly-once-delivery

go 1.13

require (
github.com/ThreeDotsLabs/watermill v1.1.1
github.com/ThreeDotsLabs/watermill-sql v1.3.5
github.com/go-chi/chi/v5 v5.0.2
github.com/go-sql-driver/mysql v1.6.0
)
@@ -0,0 +1,83 @@
package main

import (
stdSQL "database/sql"
"encoding/json"
"log"
"net/http"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-sql/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
driver "github.com/go-sql-driver/mysql"
)

const topic = "counter"

func main() {
db := createDB()
logger := watermill.NewStdLogger(false, false)

r := chi.NewRouter()
r.Use(middleware.Recoverer)
r.Use(middleware.Logger)

publisher, err := sql.NewPublisher(
db,
sql.PublisherConfig{
SchemaAdapter: sql.DefaultMySQLSchema{},
},
logger,
)
if err != nil {
panic(err)
}

r.Post("/count/{counterUUID}", func(w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(messagePayload{
CounterUUID: chi.URLParam(r, "counterUUID"),
})
if err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

msg := message.NewMessage(watermill.NewUUID(), payload)

if err := publisher.Publish(topic, msg); err != nil {
log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
})

http.ListenAndServe(":8080", r)
}

type messagePayload struct {
CounterUUID string `json:"counter_uuid"`
}

func createDB() *stdSQL.DB {
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = "mysql"
conf.DBName = "example"

db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
panic(err)
}

err = db.Ping()
if err != nil {
panic(err)
}

return db
}
@@ -0,0 +1,10 @@
module exactly-once-delivery

go 1.13

require (
github.com/ThreeDotsLabs/watermill v1.1.1
github.com/ThreeDotsLabs/watermill-sql v1.3.5
github.com/go-sql-driver/mysql v1.6.0
github.com/pkg/errors v0.9.1
)

0 comments on commit 6b8695d

Please sign in to comment.