Skip to content

Commit

Permalink
changefeedccl: adjust backport for release-19.1
Browse files Browse the repository at this point in the history
The cloud storage sink uses the original filename format, so unlike
release-19.2, we also need to pad schemaID.

Release note: None
  • Loading branch information
danhhz committed Dec 17, 2019
1 parent d976707 commit da56d30
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 6 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/nemeses.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ type nemeses struct {
rowCount int
maxTestColumnCount int
eventMix map[fsm.Event]int
mixTotal int
usingPoller bool

v *CountValidator
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,10 @@ func (s *cloudStorageSink) flushFile(
fileID := s.fileID
s.fileID++
// Pad file ID to maintain lexical ordering among files from the same sink.
// Note that we use `-` here to delimit the filename because we want
// `%d.RESOLVED` files to lexicographically succeed data files that have the
// same timestamp. This works because ascii `-` < ascii '.'.
filename := fmt.Sprintf(`%s-%s-%d-%d-%d-%08d%s`, s.dataFileTs,
// Ditto for schema ID. Note that we use `-` here to delimit the filename
// because we want `%d.RESOLVED` files to lexicographically succeed data files
// that have the same timestamp. This works because ascii `-` < ascii '.'.
filename := fmt.Sprintf(`%s-%s-%08d-%d-%d-%08d%s`, s.dataFileTs,
key.Topic, key.SchemaID, s.nodeID, s.sinkID, fileID, s.ext)
return s.es.WriteFile(ctx, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes()))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestCloudStorageSink(t *testing.T) {
require.NoError(t, s.Flush(ctx))

dataFile, err := ioutil.ReadFile(filepath.Join(
dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-0-1-7-00000000.ndjson`))
dir, sinkDir, `1970-01-01`, `197001010000000000000000000000000-t1-00000000-1-7-00000000.ndjson`))
require.NoError(t, err)
require.Equal(t, "v1\n", string(dataFile))

Expand Down

0 comments on commit da56d30

Please sign in to comment.