Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #12 from cybertec-postgresql/11_new_record_state_e…
Browse files Browse the repository at this point in the history
…xtraction

Implement new record state extraction
  • Loading branch information
pashagolub committed Dec 9, 2020
2 parents 1763b1d + 8b1cb74 commit 3837b7d
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 157 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,30 @@ Application to apply CDC log from [Debezium](https://debezium.io/) to the target
- `postgres` - PostgreSQL connection URL

:warning: To connect to `kafka` cluster the `advertised.listeners` option should be configured properly. See more https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/

# tutorial

```bash
# We will the topology as defined in https://debezium.io/docs/tutorial/
export DEBEZIUM_VERSION=1.3
export DEBEZIUM_EXTERNAL_IP=10.0.0.105
docker-compose -f docker-compose.yml up

# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @inventory-connector.json

# Check messages from a Debezium topic for table `inventory.customers`
docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers

# Modify records in the database via MySQL client and check the output in Debezium
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

# Run debezium2postgres transformation
debezium2postgres --kafka=$DEBEZIUM_EXTERNAL_IP:9092 --topic=dbserver1.inventory --loglevel=debug --postgres=postgres://user:pwd@10.0.0.105/inventory

# Shut down the cluster
docker-compose -f docker-compose-mysql.yaml down
11 changes: 8 additions & 3 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getTopics(brokers []string) ([]string, error) {
}

// Consume function receives messages from Kafka and sends them to the `messages` channel
func Consume(ctx context.Context, brokers []string, topicPattern string, messages chan<- []byte) {
func Consume(ctx context.Context, brokers []string, topicPattern string, messages chan<- Message) {
Logger.Debug("Starting consuming from kafka...")
topics, err := getTopics(brokers)
if err != nil {
Expand All @@ -59,7 +59,7 @@ func Consume(ctx context.Context, brokers []string, topicPattern string, message
}
}
}
func consumeTopic(ctx context.Context, brokers []string, topic string, messages chan<- []byte) {
func consumeTopic(ctx context.Context, brokers []string, topic string, messages chan<- Message) {
topiclogger := Logger.WithField("topic", topic)
reader := getReader(brokers, topic)
defer reader.Close()
Expand All @@ -71,6 +71,11 @@ func consumeTopic(ctx context.Context, brokers []string, topic string, messages
return
}
topiclogger.WithField("key", string(m.Key)).WithField("value", string(m.Value)).Trace("Message consumed")
messages <- m.Value
msg, err := NewMessage(m)
if err != nil {
topiclogger.Error(err)
continue
}
messages <- *msg
}
}
30 changes: 19 additions & 11 deletions internal/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestGetTopics(t *testing.T) {
Logger.Logger.ExitFunc = func(int) {
t.Log("log.Fatal called")
}
Consume(ctx, []string{"foo", "bar"}, "baz", make(chan []byte, 1))
Consume(ctx, []string{"foo", "bar"}, "baz", make(chan Message, 1))

newConsumer = func(addrs []string, config *sarama.Config) (sarama.Consumer, error) {
c := mocks.NewConsumer(t, nil)
c.SetTopicMetadata(map[string][]int32{"foo": {1, 2, 3}})
return c, nil
}
topics, err := getTopics([]string{"foo", "bar"})
Consume(ctx, []string{"foo", "bar"}, "foo", make(chan []byte, 1))
Consume(ctx, []string{"foo", "bar"}, "foo", make(chan Message, 1))
assert.NoError(t, err)
assert.Equal(t, topics, []string{"foo"})
}
Expand All @@ -50,14 +50,17 @@ type mockKafkaReader struct {
}

func (r *mockKafkaReader) ReadMessage(ctx context.Context) (kafka.Message, error) {
if r.ReadMessageHandler != nil {
return r.ReadMessageHandler()
}
if ctx.Err() != nil {
return kafka.Message{}, ctx.Err()
}
if r.ReadMessageHandler != nil {
return r.ReadMessageHandler()
}
time.Sleep(500 * time.Millisecond)
return kafka.Message{}, nil
return kafka.Message{
Value: []byte(`{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"int64","optional":true,"field":"__source_ts_ms"},{"type":"string","optional":true,"field":"__db"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.inventory.customers.Value"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__source_ts_ms":0,"__db":"inventory","__table":"customers","__op":"c","__deleted":"false"}}`),
Key: []byte(`{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}`),
}, nil
}

func (r *mockKafkaReader) Close() error {
Expand All @@ -68,14 +71,19 @@ func TestConsumeTopic(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

consumeTopic(ctx, []string{"foo", "bar"}, "baz", make(chan []byte, 10))
getReader = func(brokers []string, topic string) kafkaReader {
return &mockKafkaReader{}
}
consumeTopic(ctx, []string{"foo", "bar"}, "baz", make(chan Message, 10))

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()

getReader = func(brokers []string, topic string) kafkaReader {
return &mockKafkaReader{}
return &mockKafkaReader{
ReadMessageHandler: func() (kafka.Message, error) {
time.Sleep(500 * time.Millisecond)
return kafka.Message{}, nil
}}
}
consumeTopic(ctx, []string{"foo", "bar"}, "baz", make(chan []byte, 10))
consumeTopic(ctx, []string{"foo", "bar"}, "baz", make(chan Message, 10))
}
118 changes: 118 additions & 0 deletions internal/kafka/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package kafka

import (
"encoding/json"
"errors"
"strings"

kafka "github.com/segmentio/kafka-go"
)

type cdcField struct {
Type string `json:"type"`
Optional bool `json:"optional"`
Field string `json:"field"`
}

type cdcFields struct {
Type string `json:"type"`
Fields []cdcField `json:"fields,omitempty"`
Optional bool `json:"optional"`
Name string `json:"name,omitempty"`
Field string `json:"field"`
}

type cdcSchema struct {
Type string `json:"type"`
Name string `json:"name"`
Fields []cdcFields `json:"fields"`
Optional bool `json:"optional"`
}

type cdcMessage struct {
Schema *cdcSchema `json:"schema"`
Payload *map[string]interface{} `json:"payload"`
}

type cdcKey struct {
Schema *cdcSchema `json:"schema"`
Payload *map[string]interface{} `json:"payload"`
}

// Message is a data structure representing kafka messages
type Message struct {
kafka.Message
Op string
TableName string
SchemaName string
Keys map[string]interface{}
Values map[string]interface{}
}

// NewMessage used to create and init a new message instance
func NewMessage(msg kafka.Message) (*Message, error) {
var err error
message := &Message{
Message: msg,
Keys: make(map[string]interface{}),
Values: make(map[string]interface{}),
}
err = message.initKeys()
if err != nil {
return nil, err
}
err = message.initValues()
if err != nil {
return nil, err
}
return message, nil
}

// initKeys inits keys with the values to use in SQL DML statement
func (m *Message) initKeys() error {
var key cdcKey
if err := json.Unmarshal(m.Key, &key); err != nil {
return err
}
if key.Payload == nil {
return errors.New("Payload is nil")
}
m.Keys = *key.Payload
return nil
}

// initValues inits table name, operation and field names with the values to use in SQL DML statement
func (m *Message) initValues() error {
var msg cdcMessage
if err := json.Unmarshal(m.Value, &msg); err != nil {
return err
}
if msg.Payload == nil {
return errors.New("Payload is nil")
}
for k, v := range *msg.Payload {
if strings.HasPrefix(k, "__") { // system fields
switch k {
case "__schema":
m.SchemaName = v.(string)
case "__table":
m.TableName = v.(string)
case "__op":
m.Op = v.(string)
}
continue
}
m.Values[k] = v
}
return nil
}

func (m *Message) QualifiedTablename() string {
quoteIdent := func(s string) string {
return `"` + strings.Replace(s, `"`, `""`, -1) + `"`
}
if m.SchemaName > "" {
return quoteIdent(m.SchemaName) + "." + quoteIdent(m.TableName)
}
return quoteIdent(m.TableName)
}
46 changes: 46 additions & 0 deletions internal/kafka/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kafka

import (
"testing"

kafka "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

func TestNewMessage(t *testing.T) {
m := kafka.Message{
Value: []byte(`{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"int64","optional":true,"field":"__source_ts_ms"},{"type":"string","optional":true,"field":"__db"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.inventory.customers.Value"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com","__source_ts_ms":0,"__db":"inventory","__schema":"public","__table":"customers","__op":"c","__deleted":"false"}}`),
Key: []byte(`{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}`),
}
msg, err := NewMessage(m)
assert.NoError(t, err)
assert.NotNil(t, msg)

m.Value = []byte(`{"schema":null, "payload":null}`)
msg, err = NewMessage(m)
assert.Error(t, err, "Corrupted value payload")
assert.Nil(t, msg)

m.Value = []byte{}
msg, err = NewMessage(m)
assert.Error(t, err, "Corrupted value")
assert.Nil(t, msg)

m.Key = []byte(`{"schema":null, "payload":null}`)
msg, err = NewMessage(m)
assert.Error(t, err, "Corrupted key payload")
assert.Nil(t, msg)

m.Key = []byte{}
msg, err = NewMessage(m)
assert.Error(t, err, "Corrupted key")
assert.Nil(t, msg)
}

func TestQualifiedTableName(t *testing.T) {
m := Message{}
m.TableName = "bar"
assert.Equal(t, m.QualifiedTablename(), `"bar"`, "No schema used")
m.SchemaName = "foo"
assert.Equal(t, m.QualifiedTablename(), `"foo"."bar"`, "Schema qualified")
}

0 comments on commit 3837b7d

Please sign in to comment.