Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Add --timeout parameter to stop consuming, closes #13 #CMN-122
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Dec 10, 2020
1 parent a4a08a3 commit 444a6ad
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 4 deletions.
1 change: 1 addition & 0 deletions internal/cmdparser/cmdparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type CmdOptions struct {
Postgres string `long:"postgres" description:"PostgreSQL connection string" env:"DBZ2PG_PGURL"`
Kafka []string `long:"kafka" description:"Kafka connection string" env:"DBZ2PG_KAFKA"`
Topic string `long:"topic" description:"Topic name (or prefix of the topic name) to consume" env:"DBZ2PG_TOPIC" required:"True"`
Timeout int `long:"timeout" default:"10" description:"Idle timeout for consuming kafka messages" env:"DBZ2PG_TIMEOUT"`
}

// Parse will parse command line arguments and initialize pgengine
Expand Down
2 changes: 2 additions & 0 deletions internal/kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ func NewMessage(msg kafka.Message) (*Message, error) {
}
err = message.initKeys()
if err != nil {
Logger.WithError(err).Debug("initKeys failed")
return nil, err
}
err = message.initValues()
if err != nil {
Logger.WithError(err).Debug("initValues failed")
return nil, err
}
return message, nil
Expand Down
6 changes: 5 additions & 1 deletion internal/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/cybertec-postgresql/debezium2postgres/internal/kafka"
)

// Apply function reads messages from `messages` channel and applies changes to the target PostgreSQL database
func Apply(ctx context.Context, connString string, messages <-chan kafka.Message) {
func Apply(ctx context.Context, connString string, idleTimeout time.Duration, messages <-chan kafka.Message) {
conn, err := Connect(context.Background(), connString)
if err != nil {
Logger.Fatalln(err)
Expand All @@ -28,6 +29,9 @@ func Apply(ctx context.Context, connString string, messages <-chan kafka.Message
}
case <-ctx.Done():
return
case <-time.After(idleTimeout):
Logger.Print("Idle timeout exceeded")
return
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/postgres/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestApply(t *testing.T) {
Connect = func(ctx context.Context, connString string) (DBExecutorContext, error) {
return nil, errors.New("bad connection")
}
Apply(ctx, "foo", msgChan)
Apply(ctx, "foo", time.Second, msgChan)

Connect = func(ctx context.Context, connString string) (DBExecutorContext, error) {
return &MockDbExec{
Expand All @@ -51,7 +51,7 @@ func TestApply(t *testing.T) {
},
}, nil
}
Apply(ctx, "foo", msgChan)
Apply(ctx, "foo", time.Second, msgChan)
}

func TestApplyCDCItem(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"os"
"time"

"github.com/cybertec-postgresql/debezium2postgres/internal/cmdparser"
"github.com/cybertec-postgresql/debezium2postgres/internal/kafka"
Expand All @@ -24,5 +25,5 @@ func main() {
// create channel for passing messages to database worker
var msgChannel chan kafka.Message = make(chan kafka.Message, 16)
kafka.Consume(context.Background(), cmdOpts.Kafka, cmdOpts.Topic, msgChannel)
postgres.Apply(context.Background(), cmdOpts.Postgres, msgChannel)
postgres.Apply(context.Background(), cmdOpts.Postgres, time.Duration(cmdOpts.Timeout)*time.Second, msgChannel)
}

0 comments on commit 444a6ad

Please sign in to comment.