Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/rangefeed: reduce size of event struct from 200 bytes to 72 bytes #88308

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type FeedBudget struct {
closed bool
}
// Maximum amount of memory to use by feed. We use separate limit here to
// avoid creating BytesMontior with a limit per feed.
// avoid creating BytesMonitor with a limit per feed.
limit int64
// Channel to notify that memory was returned to the budget.
replenishC chan interface{}
Expand Down
115 changes: 67 additions & 48 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,37 @@ func putPooledEvent(ev *event) {
// to be informed of. It is used so that all events can be sent over the same
// channel, which is necessary to prevent reordering.
type event struct {
ops []enginepb.MVCCLogicalOp
ct hlc.Timestamp
sst []byte
sstSpan roachpb.Span
sstWTS hlc.Timestamp
initRTS bool
syncC chan struct{}
// This setting is used in conjunction with syncC in tests in order to ensure
// that all registrations have fully finished outputting their buffers. This
// has to be done by the processor in order to avoid race conditions with the
// Event variants. Only one set.
ops opsEvent
ct ctEvent
initRTS initRTSEvent
sst *sstEvent
sync *syncEvent
// Budget allocated to process the event.
alloc *SharedBudgetAllocation
}

type opsEvent []enginepb.MVCCLogicalOp

type ctEvent struct {
hlc.Timestamp
}

type initRTSEvent bool

type sstEvent struct {
data []byte
span roachpb.Span
ts hlc.Timestamp
}

type syncEvent struct {
c chan struct{}
// This setting is used in conjunction with c in tests in order to ensure that
// all registrations have fully finished outputting their buffers. This has to
// be done by the processor in order to avoid race conditions with the
// registry. Should be used only in tests.
testRegCatchupSpan roachpb.Span
// Budget allocated to process the event
allocation *SharedBudgetAllocation
testRegCatchupSpan *roachpb.Span
}

// spanErr is an error across a key span that will disconnect overlapping
Expand Down Expand Up @@ -326,7 +343,7 @@ func (p *Processor) run(
// Transform and route events.
case e := <-p.eventC:
p.consumeEvent(ctx, e)
e.allocation.Release(ctx)
e.alloc.Release(ctx)
putPooledEvent(e)

// Check whether any unresolved intents need a push.
Expand Down Expand Up @@ -530,7 +547,7 @@ func (p *Processor) ConsumeSSTable(
if p == nil {
return true
}
return p.sendEvent(ctx, event{sst: sst, sstSpan: sstSpan, sstWTS: writeTS}, p.EventChanTimeout)
return p.sendEvent(ctx, event{sst: &sstEvent{sst, sstSpan, writeTS}}, p.EventChanTimeout)
}

// ForwardClosedTS indicates that the closed timestamp that serves as the basis
Expand All @@ -546,7 +563,7 @@ func (p *Processor) ForwardClosedTS(ctx context.Context, closedTS hlc.Timestamp)
if closedTS.IsEmpty() {
return true
}
return p.sendEvent(ctx, event{ct: closedTS}, p.EventChanTimeout)
return p.sendEvent(ctx, event{ct: ctEvent{closedTS}}, p.EventChanTimeout)
}

// sendEvent informs the Processor of a new event. If a timeout is specified,
Expand All @@ -557,13 +574,13 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio
// path where we have enough budget and outgoing channel is free. If not, we
// try to set up timeout for acquiring budget and then reuse it for inserting
// value into channel.
var allocation *SharedBudgetAllocation
var alloc *SharedBudgetAllocation
if p.MemBudget != nil {
size := calculateDateEventSize(e)
if size > 0 {
var err error
// First we will try non-blocking fast path to allocate memory budget.
allocation, err = p.MemBudget.TryGet(ctx, size)
alloc, err = p.MemBudget.TryGet(ctx, size)
if err != nil {
// Since we don't have enough budget, we should try to wait for
// allocation returns before failing.
Expand All @@ -576,20 +593,20 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio
timeout = 0
}
p.Metrics.RangeFeedBudgetBlocked.Inc(1)
allocation, err = p.MemBudget.WaitAndGet(ctx, size)
alloc, err = p.MemBudget.WaitAndGet(ctx, size)
}
if err != nil {
p.Metrics.RangeFeedBudgetExhausted.Inc(1)
p.sendStop(newErrBufferCapacityExceeded())
return false
}
defer func() {
allocation.Release(ctx)
alloc.Release(ctx)
}()
}
}
ev := getPooledEvent(e)
ev.allocation = allocation
ev.alloc = alloc
if timeout == 0 {
// Timeout is zero if no timeout was requested or timeout is already set on
// the context by budget allocation. Just try to write using context as a
Expand All @@ -598,7 +615,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it.
allocation = nil
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
case <-ctx.Done():
Expand All @@ -612,7 +629,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it.
allocation = nil
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
default:
Expand All @@ -625,7 +642,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio
case p.eventC <- ev:
// Reset allocation after successful posting to prevent deferred cleanup
// from freeing it.
allocation = nil
alloc = nil
case <-p.stoppedC:
// Already stopped. Do nothing.
case <-ctx.Done():
Expand All @@ -650,7 +667,7 @@ func (p *Processor) setResolvedTSInitialized(ctx context.Context) {
// It does so by flushing the event pipeline.
func (p *Processor) syncEventC() {
syncC := make(chan struct{})
ev := getPooledEvent(event{syncC: syncC})
ev := getPooledEvent(event{sync: &syncEvent{c: syncC}})
select {
case p.eventC <- ev:
select {
Expand All @@ -666,43 +683,43 @@ func (p *Processor) syncEventC() {

func (p *Processor) consumeEvent(ctx context.Context, e *event) {
switch {
case len(e.ops) > 0:
p.consumeLogicalOps(ctx, e.ops, e.allocation)
case len(e.sst) > 0:
p.consumeSSTable(ctx, e.sst, e.sstSpan, e.sstWTS, e.allocation)
case e.ops != nil:
p.consumeLogicalOps(ctx, e.ops, e.alloc)
case !e.ct.IsEmpty():
p.forwardClosedTS(ctx, e.ct)
case e.initRTS:
p.forwardClosedTS(ctx, e.ct.Timestamp)
case bool(e.initRTS):
p.initResolvedTS(ctx)
case e.syncC != nil:
if e.testRegCatchupSpan.Valid() {
if err := p.reg.waitForCaughtUp(e.testRegCatchupSpan); err != nil {
case e.sst != nil:
p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc)
case e.sync != nil:
if e.sync.testRegCatchupSpan != nil {
if err := p.reg.waitForCaughtUp(*e.sync.testRegCatchupSpan); err != nil {
log.Errorf(
ctx,
"error waiting for registries to catch up during test, results might be impacted: %s",
err,
)
}
}
close(e.syncC)
close(e.sync.c)
default:
panic(fmt.Sprintf("missing event variant: %+v", e))
}
}

func (p *Processor) consumeLogicalOps(
ctx context.Context, ops []enginepb.MVCCLogicalOp, allocation *SharedBudgetAllocation,
ctx context.Context, ops []enginepb.MVCCLogicalOp, alloc *SharedBudgetAllocation,
) {
for _, op := range ops {
// Publish RangeFeedValue updates, if necessary.
switch t := op.GetValue().(type) {
case *enginepb.MVCCWriteValueOp:
// Publish the new value directly.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation)
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc)

case *enginepb.MVCCDeleteRangeOp:
// Publish the range deletion directly.
p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, allocation)
p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, alloc)

case *enginepb.MVCCWriteIntentOp:
// No updates to publish.
Expand All @@ -712,7 +729,7 @@ func (p *Processor) consumeLogicalOps(

case *enginepb.MVCCCommitIntentOp:
// Publish the newly committed value.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation)
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc)

case *enginepb.MVCCAbortIntentOp:
// No updates to publish.
Expand All @@ -737,9 +754,9 @@ func (p *Processor) consumeSSTable(
sst []byte,
sstSpan roachpb.Span,
sstWTS hlc.Timestamp,
allocation *SharedBudgetAllocation,
alloc *SharedBudgetAllocation,
) {
p.publishSSTable(ctx, sst, sstSpan, sstWTS, allocation)
p.publishSSTable(ctx, sst, sstSpan, sstWTS, alloc)
}

func (p *Processor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) {
Expand All @@ -759,7 +776,7 @@ func (p *Processor) publishValue(
key roachpb.Key,
timestamp hlc.Timestamp,
value, prevValue []byte,
allocation *SharedBudgetAllocation,
alloc *SharedBudgetAllocation,
) {
if !p.Span.ContainsKey(roachpb.RKey(key)) {
log.Fatalf(ctx, "key %v not in Processor's key range %v", key, p.Span)
Expand All @@ -778,14 +795,14 @@ func (p *Processor) publishValue(
},
PrevValue: prevVal,
})
p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, allocation)
p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, alloc)
}

func (p *Processor) publishDeleteRange(
ctx context.Context,
startKey, endKey roachpb.Key,
timestamp hlc.Timestamp,
allocation *SharedBudgetAllocation,
alloc *SharedBudgetAllocation,
) {
span := roachpb.Span{Key: startKey, EndKey: endKey}
if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) {
Expand All @@ -797,15 +814,15 @@ func (p *Processor) publishDeleteRange(
Span: span,
Timestamp: timestamp,
})
p.reg.PublishToOverlapping(ctx, span, &event, allocation)
p.reg.PublishToOverlapping(ctx, span, &event, alloc)
}

func (p *Processor) publishSSTable(
ctx context.Context,
sst []byte,
sstSpan roachpb.Span,
sstWTS hlc.Timestamp,
allocation *SharedBudgetAllocation,
alloc *SharedBudgetAllocation,
) {
if sstSpan.Equal(roachpb.Span{}) {
panic(errors.AssertionFailedf("received SSTable without span"))
Expand All @@ -819,7 +836,7 @@ func (p *Processor) publishSSTable(
Span: sstSpan,
WriteTS: sstWTS,
},
}, allocation)
}, alloc)
}

func (p *Processor) publishCheckpoint(ctx context.Context) {
Expand Down Expand Up @@ -852,6 +869,8 @@ func calculateDateEventSize(e event) int64 {
for _, op := range e.ops {
size += int64(op.Size())
}
size += int64(len(e.sst))
if e.sst != nil {
size += int64(len(e.sst.data))
}
return size
}
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1052,7 +1053,7 @@ func (p *Processor) syncEventAndRegistrations() {
// overlapping the given span to fully process their own internal buffers.
func (p *Processor) syncEventAndRegistrationSpan(span roachpb.Span) {
syncC := make(chan struct{})
ev := getPooledEvent(event{syncC: syncC, testRegCatchupSpan: span})
ev := getPooledEvent(event{sync: &syncEvent{c: syncC, testRegCatchupSpan: &span}})
select {
case p.eventC <- ev:
select {
Expand Down Expand Up @@ -1466,3 +1467,11 @@ func BenchmarkProcessorWithBudget(b *testing.B) {
require.NoError(b, err.GoError())
}
}

// TestSizeOfEvent tests the size of the event struct. It is fine if this struct
// changes in size, as long as this is done consciously.
func TestSizeOfEvent(t *testing.T) {
var e event
size := int(unsafe.Sizeof(e))
require.Equal(t, 72, size)
}
Loading