Skip to content

Commit

Permalink
[DNM] changefeedccl: implement gocloud.dev/pubsub sink
Browse files Browse the repository at this point in the history
This is a WIP PR to implement a changefeed sink for the generic pubsub API
exposed by gocloud.dev/pubsub. It does not currently deal with lack of ordering
for the nemesis test.

Fixes cockroachdb#36982.

Release note (enterprise change): Add support for gcp pubsub as a CHANGEFEED
sink.
  • Loading branch information
ajwerner committed Dec 10, 2019
1 parent 9a2bb03 commit ee4cd1c
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 3 deletions.
34 changes: 33 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/jackc/pgx"
"github.com/pkg/errors"
"gocloud.dev/pubsub"
)

// TestFeedFactory is an interface to create changefeeds.
Expand Down Expand Up @@ -80,6 +81,93 @@ type TestFeed interface {
Close() error
}

type pubsubFeedFactory struct {
s serverutils.TestServerInterface
db *gosql.DB
flushCh chan struct{}
}

// MakePubsubFeedFactory returns a TestFeedFactory implementation using the cloud
// storage sink.
func MakePubsubFeedFactory(
s serverutils.TestServerInterface, db *gosql.DB, flushCh chan struct{},
) TestFeedFactory {
return &pubsubFeedFactory{s: s, db: db, flushCh: flushCh}
}

type pubsubFeed struct {
jobFeed
sub *pubsub.Subscription
}

func (p *pubsubFeed) Partitions() []string {
return []string{""}
}

func (p *pubsubFeed) Next() (*TestFeedMessage, error) {
m, err := p.sub.Receive(context.TODO())
if err != nil {
return nil, err
}
var tfm *TestFeedMessage
if topic, ok := m.Metadata["topic"]; ok {
k, v, err := extractKeyFromJSONValue(m.Body)
if err != nil {
return nil, err
}
tfm = &TestFeedMessage{
Key: k,
Value: v,
Topic: topic,
}
} else {
tfm = &TestFeedMessage{
Resolved: m.Body,
}
}
m.Ack()
return tfm, nil
}

func (p pubsubFeed) Close() error {
return p.sub.Shutdown(context.TODO())
}

var _ TestFeed = (*pubsubFeed)(nil)

func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (TestFeed, error) {
parsed, err := parser.ParseOne(create)
if err != nil {
return nil, err
}
createStmt := parsed.AST.(*tree.CreateChangefeed)
if createStmt.SinkURI != nil {
return nil, errors.Errorf(`unexpected sink provided: "INTO %s"`, tree.AsString(createStmt.SinkURI))
}
sinkURI := `mem://todo`
createStmt.SinkURI = tree.NewStrVal(sinkURI)
pf := &pubsubFeed{
jobFeed: jobFeed{
db: p.db,
flushCh: p.flushCh,
},
}
if err := p.db.QueryRow(createStmt.String(), args...).Scan(&pf.JobID); err != nil {
return nil, err
}
pf.sub, err = pubsub.OpenSubscription(context.TODO(), sinkURI)
if err != nil {
return nil, err
}
return pf, nil
}

func (p pubsubFeedFactory) Server() serverutils.TestServerInterface {
return p.s
}

var _ TestFeedFactory = (*pubsubFeedFactory)(nil)

type sinklessFeedFactory struct {
s serverutils.TestServerInterface
sink url.URL
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func changefeedPlanHook(
if _, err := getEncoder(details.Opts); err != nil {
return err
}
if isCloudStorageSink(parsedSink) {
if isCloudStorageSink(parsedSink) || isPubsubSink(parsedSink) {
details.Opts[optKeyInValue] = ``
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func TestChangefeedBasics(t *testing.T) {
t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`pubsub`, pubsubTest(testFn))

// NB running TestChangefeedBasics, which includes a DELETE, with
// cloudStorageTest is a regression test for #36994.
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,39 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory))
}
}

func pubsubTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()

flushCh := make(chan struct{}, 1)
defer close(flushCh)
knobs := base.TestingKnobs{DistSQL: &execinfra.TestingKnobs{Changefeed: &TestingKnobs{
AfterSinkFlush: func() error {
select {
case flushCh <- struct{}{}:
default:
}
return nil
},
}}}

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
UseDatabase: "d",
Knobs: knobs,
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)
sqlDB.Exec(t, `CREATE DATABASE d`)

f := cdctest.MakePubsubFeedFactory(s, db, flushCh)
testFn(t, db, f)

}
}

func cloudStorageTest(
testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory),
) func(*testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/nemeses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestChangefeedNemeses(t *testing.T) {
t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
t.Run(`cloudstorage`, cloudStorageTest(testFn))
t.Run(`pubsub`, pubsubTest(testFn))
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1, regexp.MustCompile("cdc ux violation"))
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func getSink(
makeSink = func() (Sink, error) {
return makeKafkaSink(cfg, u.Host, targets)
}
case isPubsubSink(u):
makeSink = func() (Sink, error) {
return makePubsubSink(u.String(), opts)
}
case isCloudStorageSink(u):
fileSizeParam := q.Get(sinkParamFileSize)
q.Del(sinkParamFileSize)
Expand Down
88 changes: 88 additions & 0 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package changefeedccl

import (
"context"
"net/url"

"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/mempubsub"
)

func isPubsubSink(u *url.URL) bool {
return u.Scheme == "gcppubsub" || u.Scheme == "mem"
}

type pubsubSink struct {
topic *pubsub.Topic
}

func makePubsubSink(baseURI string, opts map[string]string) (Sink, error) {

switch formatType(opts[optFormat]) {
case optFormatJSON:
default:
return nil, errors.Errorf(`this sink is incompatible with %s=%s`,
optFormat, opts[optFormat])
}

switch envelopeType(opts[optEnvelope]) {
case optEnvelopeWrapped:
default:
return nil, errors.Errorf(`this sink is incompatible with %s=%s`,
optEnvelope, opts[optEnvelope])
}

if _, ok := opts[optKeyInValue]; !ok {
return nil, errors.Errorf(`this sink requires the WITH %s option`, optKeyInValue)
}

ctx := context.TODO()
var err error
topic, err := pubsub.OpenTopic(ctx, baseURI)
if err != nil {
return nil, err
}
return &pubsubSink{
topic: topic,
}, nil
}

func (p *pubsubSink) EmitRow(
ctx context.Context, table *sqlbase.TableDescriptor, key, value []byte, updated hlc.Timestamp,
) error {
msg := pubsub.Message{
Metadata: map[string]string{
"topic": table.Name,
},
Body: value,
}
return errors.Wrap(p.topic.Send(ctx, &msg), "failed to send")
}

func (p *pubsubSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
var noTopic string
payload, err := encoder.EncodeResolvedTimestamp(ctx, noTopic, resolved)
if err != nil {
return err
}
msg := pubsub.Message{
Body: payload,
}
return errors.Wrap(p.topic.Send(ctx, &msg), "failed to emit resolved timestamp")
}

func (p *pubsubSink) Flush(ctx context.Context) error {
return nil
}

func (p *pubsubSink) Close() error {
// return p.topic.Shutdown(context.TODO())
return nil
}

var _ Sink = (*pubsubSink)(nil)
2 changes: 1 addition & 1 deletion vendor
Submodule vendor updated 46 files
+50 −113 github.com/cockroachdb/pebble/compaction.go
+70 −110 github.com/cockroachdb/pebble/compaction_iter.go
+1 −1 github.com/cockroachdb/pebble/go.mod
+12 −5 github.com/cockroachdb/pebble/internal/rangedel/fragmenter.go
+3 −2 github.com/cockroachdb/pebble/pacer.go
+4 −22 github.com/cockroachdb/pebble/sstable/block.go
+6 −0 github.com/cockroachdb/pebble/sstable/filter.go
+1 −7 github.com/cockroachdb/pebble/sstable/reader.go
+29 −22 github.com/cockroachdb/pebble/sstable/writer.go
+56 −0 go.opencensus.io/plugin/ocgrpc/client.go
+107 −0 go.opencensus.io/plugin/ocgrpc/client_metrics.go
+49 −0 go.opencensus.io/plugin/ocgrpc/client_stats_handler.go
+19 −0 go.opencensus.io/plugin/ocgrpc/doc.go
+80 −0 go.opencensus.io/plugin/ocgrpc/server.go
+97 −0 go.opencensus.io/plugin/ocgrpc/server_metrics.go
+63 −0 go.opencensus.io/plugin/ocgrpc/server_stats_handler.go
+208 −0 go.opencensus.io/plugin/ocgrpc/stats_common.go
+107 −0 go.opencensus.io/plugin/ocgrpc/trace_common.go
+23 −0 gocloud.dev/AUTHORS
+48 −0 gocloud.dev/CONTRIBUTORS
+202 −0 gocloud.dev/LICENSE
+91 −0 gocloud.dev/gcerrors/errors.go
+222 −0 gocloud.dev/internal/batcher/batcher.go
+16 −0 gocloud.dev/internal/gcerr/errorcode_string.go
+203 −0 gocloud.dev/internal/gcerr/gcerr.go
+69 −0 gocloud.dev/internal/oc/metrics.go
+81 −0 gocloud.dev/internal/oc/trace.go
+105 −0 gocloud.dev/internal/openurl/openurl.go
+78 −0 gocloud.dev/internal/retry/retry.go
+199 −0 gocloud.dev/pubsub/driver/driver.go
+364 −0 gocloud.dev/pubsub/mempubsub/mem.go
+926 −0 gocloud.dev/pubsub/pubsub.go
+342 −0 golang.org/x/perf/storage/client.go
+27 −0 golang.org/x/xerrors/LICENSE
+22 −0 golang.org/x/xerrors/PATENTS
+2 −0 golang.org/x/xerrors/README
+193 −0 golang.org/x/xerrors/adaptor.go
+1 −0 golang.org/x/xerrors/codereview.cfg
+22 −0 golang.org/x/xerrors/doc.go
+33 −0 golang.org/x/xerrors/errors.go
+187 −0 golang.org/x/xerrors/fmt.go
+34 −0 golang.org/x/xerrors/format.go
+56 −0 golang.org/x/xerrors/frame.go
+3 −0 golang.org/x/xerrors/go.mod
+8 −0 golang.org/x/xerrors/internal/internal.go
+106 −0 golang.org/x/xerrors/wrap.go

0 comments on commit ee4cd1c

Please sign in to comment.