Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: run CHANGEFEEDs via distsql #28555

Merged
merged 1 commit into from Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 2 additions & 101 deletions pkg/ccl/changefeedccl/changefeed.go
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -66,113 +65,15 @@ type emitEntry struct {
resolved *jobspb.ResolvedSpan
}

func runChangefeedFlow(
ctx context.Context,
execCfg *sql.ExecutorConfig,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
metrics *Metrics,
resultsCh chan<- tree.Datums,
progressedFn func(context.Context, jobs.HighWaterProgressedFn) error,
) error {
details, err := validateDetails(details)
if err != nil {
return err
}
var highWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil {
highWater = *h
}
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, highWater)
if err != nil {
return err
}

sink, err := getSink(details.SinkURI, resultsCh)
if err != nil {
return err
}
defer func() {
if err := sink.Close(); err != nil {
log.Warningf(ctx, "failed to close changefeed sink: %+v", err)
}
}()

if _, ok := sink.(*channelSink); !ok {
// For every sink but channelSink (which uses resultsCh to stream the
// changes), we abuse the job's results channel to make CREATE
// CHANGEFEED wait for this before returning to the user to ensure the
// setup went okay. Job resumption doesn't have the same hack, but at
// the moment ignores results and so is currently okay. Return nil
// instead of anything meaningful so that if we start doing anything
// with the results returned by resumed jobs, then it breaks instead of
// returning nonsense.
resultsCh <- tree.Datums(nil)
}

sink = makeMetricsSink(metrics, sink)

// The changefeed flow is intentionally structured as a pull model so it's
// easy to later make it into a DistSQL processor.
//
// TODO(dan): Make this into a DistSQL flow.
buf := makeBuffer()
poller := makePoller(execCfg, details, trackedSpans, highWater, buf)
rowsFn := kvsToRows(execCfg, details, buf.Get)
emitEntriesFn := emitEntries(details, sink, rowsFn)
if err != nil {
return err
}

g := ctxgroup.WithContext(ctx)
g.GoCtx(poller.Run)

metrics.mu.Lock()
metricsID := metrics.mu.id
metrics.mu.id++
metrics.mu.Unlock()
defer func() {
metrics.mu.Lock()
delete(metrics.mu.resolved, metricsID)
metrics.mu.Unlock()
}()

sf := makeSpanFrontier(trackedSpans...)
g.GoCtx(func(ctx context.Context) error {
for {
resolvedSpans, err := emitEntriesFn(ctx)
if err != nil {
return err
}
newFrontier := false
for _, resolvedSpan := range resolvedSpans {
if sf.Forward(resolvedSpan.Span, resolvedSpan.Timestamp) {
newFrontier = true
}
}
if newFrontier {
metrics.mu.Lock()
metrics.mu.resolved[metricsID] = sf.Frontier()
metrics.mu.Unlock()
if err := emitResolvedTimestamp(ctx, details, sink, progressedFn, sf); err != nil {
return err
}
}
}
})

return g.Wait()
}

// kvsToRows gets changed kvs from a closure and converts them into sql rows. It
// returns a closure that may be repeatedly called to advance the changefeed.
// The returned closure is not threadsafe.
func kvsToRows(
execCfg *sql.ExecutorConfig,
leaseManager *sql.LeaseManager,
details jobspb.ChangefeedDetails,
inputFn func(context.Context) (bufferEntry, error),
) func(context.Context) ([]emitEntry, error) {
rfCache := newRowFetcherCache(execCfg.LeaseManager)
rfCache := newRowFetcherCache(leaseManager)

var kvs sqlbase.SpanKVFetcher
appendEmitEntryForKV := func(
Expand Down
207 changes: 207 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_dist.go
@@ -0,0 +1,207 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
distsqlrun.NewChangeAggregatorProcessor = newChangeAggregatorProcessor
distsqlrun.NewChangeFrontierProcessor = newChangeFrontierProcessor
}

const (
changeAggregatorProcName = `changeagg`
changeFrontierProcName = `changefntr`
)

var changefeedResultTypes = []sqlbase.ColumnType{
{SemanticType: sqlbase.ColumnType_BYTES}, // resolved span
{SemanticType: sqlbase.ColumnType_STRING}, // topic
{SemanticType: sqlbase.ColumnType_BYTES}, // key
{SemanticType: sqlbase.ColumnType_BYTES}, // value
}

// distChangefeedFlow plans and runs a distributed changefeed.
//
// One or more ChangeAggregator processors watch table data for changes. These
// transform the changed kvs into changed rows and either emit them to a sink
// (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where
// they will be eventually returned directly via pgwire). In either case,
// periodically a span will become resolved as of some timestamp, meaning that
// no new rows will ever be emitted at or below that timestamp. These span-level
// resolved timestamps are emitted as a marshaled `jobspb.ResolvedSpan` proto in
// column 0.
//
// The flow will always have exactly one ChangeFrontier processor which all the
// ChangeAggregators feed into. It collects all span-level resolved timestamps
// and aggregates them into a changefeed-level resolved timestamp, which is the
// minimum of the span-level resolved timestamps. This changefeed-level resolved
// timestamp is emitted into the changefeed sink (or returned to the gateway if
// there is no sink) whenever it advances. ChangeFrontier also updates the
// progress of the changefeed's corresponding system job.
//
// TODO(dan): The logic here is the planning for the non-enterprise version of
// changefeeds, which is one ChangeAggregator processor feeding into one
// ChangeFrontier processor with both on the gateway node. Also implement the
// planning logic for the enterprise version, which places ChangeAggregator
// processors on the leaseholder for the spans they're watching.
func distChangefeedFlow(
ctx context.Context,
phs sql.PlanHookState,
jobID int64,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
) error {
var err error
details, err = validateDetails(details)
if err != nil {
return err
}

var highWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil {
highWater = *h
}

execCfg := phs.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, details.Targets, highWater)
if err != nil {
return err
}

// TODO(dan): Merge these with the span-level resolved timestamps from the
// job progress.
var watches []distsqlrun.ChangeAggregatorSpec_Watch
for _, span := range trackedSpans {
watches = append(watches, distsqlrun.ChangeAggregatorSpec_Watch{
Span: span,
InitialResolved: highWater,
})
}

gatewayNodeID := execCfg.NodeID.Get()
changeAggregatorProcs := []distsqlplan.Processor{{
Node: gatewayNodeID,
Spec: distsqlrun.ProcessorSpec{
Core: distsqlrun.ProcessorCoreUnion{
ChangeAggregator: &distsqlrun.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
},
},
Output: []distsqlrun.OutputRouterSpec{{Type: distsqlrun.OutputRouterSpec_PASS_THROUGH}},
},
}}
changeFrontierSpec := distsqlrun.ChangeFrontierSpec{
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
}

var p sql.PhysicalPlan

stageID := p.NewStageID()
p.ResultRouters = make([]distsqlplan.ProcessorIdx, len(changeAggregatorProcs))
for i, proc := range changeAggregatorProcs {
proc.Spec.StageID = stageID
pIdx := p.AddProcessor(proc)
p.ResultRouters[i] = pIdx
}

p.AddSingleGroupStage(
gatewayNodeID,
distsqlrun.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
distsqlrun.PostProcessSpec{},
changefeedResultTypes,
)

p.ResultTypes = changefeedResultTypes
p.PlanToStreamColMap = []int{1, 2, 3}

// Changefeed flows handle transactional consistency themselves.
var noTxn *client.Txn

dsp := phs.DistSQLPlanner()
evalCtx := phs.ExtendedEvalContext()
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, noTxn)
dsp.FinalizePlan(&planCtx, &p)

resultRows := makeChangefeedResultWriter(resultsCh)
recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
tree.Rows,
execCfg.RangeDescriptorCache,
execCfg.LeaseHolderCache,
noTxn,
func(ts hlc.Timestamp) {},
evalCtx.Tracing,
)

var finishedSetupFn func()
if details.SinkURI != `` {
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores
// results and so is currently okay. Return nil instead of anything
// meaningful so that if we start doing anything with the results
// returned by resumed jobs, then it breaks instead of returning
// nonsense.
finishedSetupFn = func() { resultsCh <- tree.Datums(nil) }
}

dsp.Run(&planCtx, noTxn, &p, recv, evalCtx, finishedSetupFn)
return resultRows.Err()
}

// changefeedResultWriter implements the `distsqlrun.resultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
rowsAffected int
err error
}

func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh}
}

func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
// Copy the row because it's not guaranteed to exist after this function
// returns.
row = append(tree.Datums(nil), row...)

select {
case <-ctx.Done():
return ctx.Err()
case w.rowsCh <- row:
return nil
}
}
func (w *changefeedResultWriter) IncrementRowsAffected(n int) {
w.rowsAffected += n
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
}
func (w *changefeedResultWriter) Err() error {
return w.err
}