Skip to content

Commit

Permalink
feat(enterpise): Change data capture (CDC) integration with kafka (#7395
Browse files Browse the repository at this point in the history
)
  • Loading branch information
aman-bansal committed Feb 9, 2021
1 parent 07046c6 commit eb7e5e1
Show file tree
Hide file tree
Showing 19 changed files with 1,551 additions and 439 deletions.
18 changes: 15 additions & 3 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ they form a Raft group and provide synchronous replication.
flag.
Sample flag could look like --audit dir=aa;encrypt_file=/filepath;compress=true`)

flag.String("cdc", "",
`Various change data capture options.
file=/path/to/directory where audit logs will be stored.
kafka=host1,host2 to define comma separated list of host.
sasl-user=username to define sasl username for kafka.
sasl-password=password to define sasl password for kafka.
ca-cert=/path/to/ca/crt/file to define ca cert for tls encryption.
client-cert=/path/to/client/cert/file to define the client certificate for tls encryption.
client-key=/path/to/client/key/file to define the client key for tls encryption.
`)

// TLS configurations
x.RegisterServerTLSFlags(flag)
}
Expand Down Expand Up @@ -618,9 +629,10 @@ func run() {
PIndexCacheSize: pstoreIndexCacheSize,
WalCache: walCache,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Audit: conf,
MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Audit: conf,
ChangeDataConf: Alpha.Conf.GetString("cdc"),
}

secretFile := Alpha.Conf.GetString("acl_secret_file")
Expand Down
3 changes: 2 additions & 1 deletion ee/enc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package enc

import (
"io"

"github.com/dgraph-io/dgraph/x"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
)

// Eebuild indicates if this is a Enterprise build.
Expand Down
3 changes: 3 additions & 0 deletions ee/utils_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ func GetEEFeaturesList() []string {
if x.WorkerConfig.Audit {
ee = append(ee, "audit")
}
if worker.Config.ChangeDataConf != "" {
ee = append(ee, "cdc")
}
return ee
}
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/Masterminds/semver/v3 v3.1.0
github.com/Microsoft/go-winio v0.4.15 // indirect
github.com/OneOfOne/xxhash v1.2.5 // indirect
github.com/Shopify/sarama v1.27.2
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210205124934-31c061ed3278
Expand All @@ -36,9 +37,9 @@ require (
github.com/golang/geo v0.0.0-20170810003146-31fb0106dc4a
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.5
github.com/golang/snappy v0.0.1
github.com/golang/snappy v0.0.2
github.com/google/codesearch v1.0.0
github.com/google/go-cmp v0.5.0
github.com/google/go-cmp v0.5.2
github.com/google/uuid v1.0.0
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9
Expand All @@ -47,6 +48,7 @@ require (
github.com/minio/minio-go/v6 v6.0.55
github.com/mitchellh/panicwrap v1.0.0
github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.2.1
github.com/prometheus/client_golang v0.9.3
Expand All @@ -57,12 +59,12 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/twpayne/go-geom v1.0.5
go.etcd.io/etcd v0.0.0-20190228193606-a943ad0ee4c9
go.opencensus.io v0.22.5
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
Expand Down
56 changes: 54 additions & 2 deletions go.sum

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ message Proposal {
uint64 index = 10; // Used to store Raft index, in raft.Ready.
uint64 expected_checksum = 11; // Block an operation until membership reaches this checksum.
RestoreRequest restore = 12;
CDCState cdc_state = 13;
}

message CDCState {
uint64 sent_ts = 1;
}

message KVS {
Expand Down
Loading

0 comments on commit eb7e5e1

Please sign in to comment.