From da56d30341e9d90663999538cebb292c72661a31 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Fri, 6 Dec 2019 14:07:29 -0800 Subject: [PATCH] changefeedccl: adjust backport for release-19.1 The cloud storage sink uses the original filename format, so unlike release-19.2, we also need to pad schemaID. Release note: None --- pkg/ccl/changefeedccl/cdctest/nemeses.go | 1 - pkg/ccl/changefeedccl/sink_cloudstorage.go | 8 ++++---- pkg/ccl/changefeedccl/sink_cloudstorage_test.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index a5468cb0ecd1..78792ea0898b 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -228,7 +228,6 @@ type nemeses struct { rowCount int maxTestColumnCount int eventMix map[fsm.Event]int - mixTotal int usingPoller bool v *CountValidator diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index e6b423a4faf9..7f3eff97f291 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -324,10 +324,10 @@ func (s *cloudStorageSink) flushFile( fileID := s.fileID s.fileID++ // Pad file ID to maintain lexical ordering among files from the same sink. - // Note that we use `-` here to delimit the filename because we want - // `%d.RESOLVED` files to lexicographically succeed data files that have the - // same timestamp. This works because ascii `-` < ascii '.'. - filename := fmt.Sprintf(`%s-%s-%d-%d-%d-%08d%s`, s.dataFileTs, + // Ditto for schema ID. Note that we use `-` here to delimit the filename + // because we want `%d.RESOLVED` files to lexicographically succeed data files + // that have the same timestamp. This works because ascii `-` < ascii '.'. + filename := fmt.Sprintf(`%s-%s-%08d-%d-%d-%08d%s`, s.dataFileTs, key.Topic, key.SchemaID, s.nodeID, s.sinkID, fileID, s.ext) return s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())) } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index 2c05ac1c20f2..c6652d8166f1 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -85,7 +85,7 @@ func TestCloudStorageSink(t *testing.T) { require.NoError(t, s.Flush(ctx)) dataFile, err := ioutil.ReadFile(filepath.Join( - dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-0-1-7-00000000.ndjson`)) + dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-00000000-1-7-00000000.ndjson`)) require.NoError(t, err) require.Equal(t, "v1\n", string(dataFile))