Skip to content

Commit

Permalink
Merge #69262 #69470
Browse files Browse the repository at this point in the history
69262: streamingccl: unrevert graceful cutover of stream ingestion r=pbardea a=adityamaru

In #68918 we reverted
a change that taught stream ingestion processors to wait for a cutover
on losing connection to the client. This was the first part of introducing
the concept of generations to c2c streaming. The change was reverted due
to a leaked goroutine during stress testing.

This change does not alter any of the core logic but simply makes the test
more reliable by adding a `Streaming` testing knob. This allows us to intercept
when the stream ingestion processor receives an Event and perform the necessary
testing.

Fixes: #68701
Fixes: #68795

Release justification (non-production code changes): Revert a revert
of previously checked in logic by fixing the testing infrastructure
that was leaking the goroutine.

69470: sql: Fix SHOW ZONE CONFIGURATIONS with very long constraints r=arulajmani a=ajstorm

Previously, in the presence of very long constraints fields, SHOW ZONE
CONFIGURATIONS would output the constraints with `\n` characters mixed in. This
was due to the fact that the yaml.v2 library contained an 80 character line
limit. We recently pulled in some commits to our fork of the yaml library which
allows the line length to be configurable. With that change, we can now
configure the line length to be unlimited in the case where we're showing the
zone configuration, and get around the ugliness of the `\n` characters.

Release note (sql change): Fixes a bug in SHOW ZONE CONFIGURATIONS where long
constraints fields may show `\n` characters.

Release justification: Low risk change to existing functionality.

Co-authored-by: Anne Zhu <anne.zhu@cockroachlabs.com>
Co-authored-by: Adam Storm <storm@cockroachlabs.com>
  • Loading branch information
3 people committed Aug 29, 2021
3 parents 885b3f9 + 80f7325 + cc2279d commit 8213411
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 33 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4419,8 +4419,8 @@ def go_deps():
build_file_proto_mode = "disable_global",
importpath = "gopkg.in/yaml.v2",
replace = "github.com/cockroachdb/yaml",
sum = "h1:EqoCicA1pbWWDGniFxhTElh2hvui7E7tEvuBNJSDn3A=",
version = "v0.0.0-20180705215940-0e2822948641",
sum = "h1:vVVz+IAeHhYPxGW9EC8j6HR7uZl/1wSP0Wijaxs4frw=",
version = "v0.0.0-20210825132133-2d6955c8edbc",
)
go_repository(
name = "in_gopkg_yaml_v3",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ replace honnef.co/go/tools => honnef.co/go/tools v0.0.1-2020.1.6

replace vitess.io/vitess => github.com/cockroachdb/vitess v0.0.0-20210218160543-54524729cc82

replace gopkg.in/yaml.v2 => github.com/cockroachdb/yaml v0.0.0-20180705215940-0e2822948641
replace gopkg.in/yaml.v2 => github.com/cockroachdb/yaml v0.0.0-20210825132133-2d6955c8edbc

replace github.com/knz/go-libedit => github.com/otan-cockroach/go-libedit v1.10.2-0.20201030151939-7cced08450e7

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ github.com/cockroachdb/ttycolor v0.0.0-20210717002733-a2a538deeb8c h1:S2vg+TZySZ
github.com/cockroachdb/ttycolor v0.0.0-20210717002733-a2a538deeb8c/go.mod h1:NltwFG0VBANi1jHKpn5KL9AbsHTE+8fPaAHT0TzL20k=
github.com/cockroachdb/vitess v0.0.0-20210218160543-54524729cc82 h1:8htEd1lLILqfjKardWfKKGgXVCs0WmcgEj9cXnmcuos=
github.com/cockroachdb/vitess v0.0.0-20210218160543-54524729cc82/go.mod h1:+bhevpN4bd6bstiRREkJDaMWZR3lTe5ypydTtXgf7GU=
github.com/cockroachdb/yaml v0.0.0-20180705215940-0e2822948641 h1:EqoCicA1pbWWDGniFxhTElh2hvui7E7tEvuBNJSDn3A=
github.com/cockroachdb/yaml v0.0.0-20180705215940-0e2822948641/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
github.com/cockroachdb/yaml v0.0.0-20210825132133-2d6955c8edbc h1:vVVz+IAeHhYPxGW9EC8j6HR7uZl/1wSP0Wijaxs4frw=
github.com/cockroachdb/yaml v0.0.0-20210825132133-2d6955c8edbc/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA=
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TestingKnobs struct {
TenantTestingKnobs ModuleTestingKnobs
JobsTestingKnobs ModuleTestingKnobs
BackupRestore ModuleTestingKnobs
Streaming ModuleTestingKnobs
MigrationManager ModuleTestingKnobs
IndexUsageStatsKnobs ModuleTestingKnobs
SQLStatsKnobs ModuleTestingKnobs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# LogicTest: multiregion-3node-3superlongregions

query TTTT
SHOW REGIONS
----
veryveryveryveryveryveryverylongregion1 {} {} {}
veryveryveryveryveryveryverylongregion2 {} {} {}
veryveryveryveryveryveryverylongregion3 {} {} {}

statement ok
SELECT crdb_internal.validate_multi_region_zone_configs()

statement ok
CREATE DATABASE "mr-zone-configs" primary region "veryveryveryveryveryveryverylongregion1" regions "veryveryveryveryveryveryverylongregion2","veryveryveryveryveryveryverylongregion3"

statement ok
use "mr-zone-configs"

statement ok
SELECT crdb_internal.validate_multi_region_zone_configs()

query TT
SHOW ZONE CONFIGURATION FOR DATABASE "mr-zone-configs"
----
DATABASE "mr-zone-configs" ALTER DATABASE "mr-zone-configs" CONFIGURE ZONE USING
range_min_bytes = 134217728,
range_max_bytes = 536870912,
gc.ttlseconds = 90000,
num_replicas = 5,
num_voters = 3,
constraints = '{+region=veryveryveryveryveryveryverylongregion1: 1, +region=veryveryveryveryveryveryverylongregion2: 1, +region=veryveryveryveryveryveryverylongregion3: 1}',
voter_constraints = '[+region=veryveryveryveryveryveryverylongregion1]',
lease_preferences = '[[+region=veryveryveryveryveryveryverylongregion1]]'
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ func MakeKVEvent(kv roachpb.KeyValue) Event {
func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event {
return checkpointEvent{resolvedTimestamp: resolvedTimestamp}
}

// MakeGenerationEvent creates an GenerationEvent.
func MakeGenerationEvent() Event {
return generationEvent{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamclient
import (
"context"
gosql "database/sql"
"database/sql/driver"
"fmt"
"strconv"

Expand Down Expand Up @@ -124,7 +125,15 @@ func (m *sinklessReplicationClient) ConsumePartition(
}
}
if err := rows.Err(); err != nil {
errCh <- err
if errors.Is(err, driver.ErrBadConn) {
select {
case eventCh <- streamingccl.MakeGenerationEvent():
case <-ctx.Done():
errCh <- ctx.Err()
}
} else {
errCh <- err
}
return
}
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,17 @@ INSERT INTO d.t2 VALUES (2);
feed.ObserveResolved(ctx, secondObserved.Value.Timestamp)
cancelIngestion()
})

t.Run("stream-address-disconnects", func(t *testing.T) {
clientCtx, cancelIngestion := context.WithCancel(ctx)
eventCh, errCh, err := client.ConsumePartition(clientCtx, pa, startTime)
require.NoError(t, err)
feedSource := &channelFeedSource{eventCh: eventCh, errCh: errCh}
feed := streamingtest.MakeReplicationFeed(t, feedSource)

h.SysServer.Stopper().Stop(clientCtx)

require.True(t, feed.ObserveGeneration(clientCtx))
cancelIngestion()
})
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
Expand Down
64 changes: 50 additions & 14 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -95,14 +97,6 @@ type streamIngestionProcessor struct {
// and have attempted to flush them with `internalDrained`.
internalDrained bool

// ingestionErr stores any error that is returned from the worker goroutine so
// that it can be forwarded through the DistSQL flow.
ingestionErr error

// pollingErr stores any error that is returned from the poller checking for a
// cutover signal so that it can be forwarded through the DistSQL flow.
pollingErr error

// pollingWaitGroup registers the polling goroutine and waits for it to return
// when the processor is being drained.
pollingWaitGroup sync.WaitGroup
Expand All @@ -117,6 +111,20 @@ type streamIngestionProcessor struct {
// closePoller is used to shutdown the poller that checks the job for a
// cutover signal.
closePoller chan struct{}

// mu is used to provide thread-safe read-write operations to ingestionErr
// and pollingErr.
mu struct {
syncutil.Mutex

// ingestionErr stores any error that is returned from the worker goroutine so
// that it can be forwarded through the DistSQL flow.
ingestionErr error

// pollingErr stores any error that is returned from the poller checking for a
// cutover signal so that it can be forwarded through the DistSQL flow.
pollingErr error
}
}

// partitionEvent augments a normal event with the partition it came from.
Expand Down Expand Up @@ -190,7 +198,9 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
defer sip.pollingWaitGroup.Done()
err := sip.checkForCutoverSignal(ctx, sip.closePoller)
if err != nil {
sip.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
sip.mu.Lock()
sip.mu.pollingErr = errors.Wrap(err, "error while polling job for cutover signal")
sip.mu.Unlock()
}
}()

Expand Down Expand Up @@ -220,8 +230,11 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
return nil, sip.DrainHelper()
}

if sip.pollingErr != nil {
sip.MoveToDraining(sip.pollingErr)
sip.mu.Lock()
err := sip.mu.pollingErr
sip.mu.Unlock()
if err != nil {
sip.MoveToDraining(err)
return nil, sip.DrainHelper()
}

Expand All @@ -243,8 +256,11 @@ func (sip *streamIngestionProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pr
return row, nil
}

if sip.ingestionErr != nil {
sip.MoveToDraining(sip.ingestionErr)
sip.mu.Lock()
err = sip.mu.ingestionErr
sip.mu.Unlock()
if err != nil {
sip.MoveToDraining(err)
return nil, sip.DrainHelper()
}

Expand Down Expand Up @@ -372,7 +388,10 @@ func (sip *streamIngestionProcessor) merge(
})
}
go func() {
sip.ingestionErr = g.Wait()
err := g.Wait()
sip.mu.Lock()
defer sip.mu.Unlock()
sip.mu.ingestionErr = err
close(merged)
}()

Expand Down Expand Up @@ -405,6 +424,14 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
return sip.flush()
}

if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil {
if streamingKnobs.RunAfterReceivingEvent != nil {
streamingKnobs.RunAfterReceivingEvent(sip.Ctx)
}
}
}

switch event.Type() {
case streamingccl.KVEvent:
if err := sip.bufferKV(event); err != nil {
Expand All @@ -426,6 +453,15 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
}

return sip.flush()
case streamingccl.GenerationEvent:
log.Info(sip.Ctx, "GenerationEvent received")
select {
case <-sip.cutoverCh:
sip.internalDrained = true
return nil, nil
case <-sip.Ctx.Done():
return nil, sip.Ctx.Err()
}
default:
return nil, errors.Newf("unknown streaming event type %v", event.Type())
}
Expand Down
Loading

0 comments on commit 8213411

Please sign in to comment.