Skip to content

Commit

Permalink
Merge #123833
Browse files Browse the repository at this point in the history
123833: streamingccl: small replicating split fixups  r=kev-cao a=msbutler

See commits.

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
  • Loading branch information
craig[bot] and msbutler committed May 9, 2024
2 parents 482c9bc + 2fad9f3 commit cd17692
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,6 @@ func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
if !ingestSplitEvent.Get(&sip.EvalCtx.Settings.SV) {
return nil
}

kvDB := sip.FlowCtx.Cfg.DB.KV()
rekey, ok, err := sip.rekey(*key)
if err != nil {
Expand All @@ -865,7 +864,7 @@ func (sip *streamIngestionProcessor) handleSplitEvent(key *roachpb.Key) error {
if !ok {
return nil
}
log.Infof(ctx, "replicating split at %s", rekey)
log.Infof(ctx, "replicating split at %s", roachpb.Key(rekey).String())
expiration := kvDB.Clock().Now().AddDuration(time.Hour)
return kvDB.AdminSplit(ctx, rekey, expiration)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,15 @@ func (s *eventStream) onDeleteRange(ctx context.Context, delRange *kvpb.RangeFee
s.setErr(s.maybeFlushBatch(ctx))
}
func (s *eventStream) onMetadata(ctx context.Context, metadata *kvpb.RangeFeedMetadata) {
if s.addMu != nil {
// Split points can be sent concurrently during the initial scan.
s.addMu.Lock()
defer s.addMu.Unlock()
}
log.VInfof(ctx, 2, "received metadata event: %s, fromManualSplit: %t, parent start key %s", metadata.Span, metadata.FromManualSplit, metadata.ParentStartKey)
if metadata.FromManualSplit && !metadata.Span.Key.Equal(metadata.ParentStartKey) {
// Only send new manual split keys (i.e. a child rangefeed start key that
// differs from the parent start key)
if s.addMu != nil {
// Split points can be sent concurrently during the initial scan.
s.addMu.Lock()
defer s.addMu.Unlock()
}
s.seb.addSplitPoint(metadata.Span.Key)
s.setErr(s.maybeFlushBatch(ctx))
}
Expand Down

0 comments on commit cd17692

Please sign in to comment.