Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/taskpacer",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ var Enabled = settings.RegisterBoolSetting(
// MessageSender is the interface that defines how Store Liveness messages are
// sent. Transport is the production implementation of MessageSender.
type MessageSender interface {
// TODO: Change the funciton name to something that means "enqueue" messages
// for sending.
SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
SendAllMessages(ctx context.Context)
}

// SupportManager orchestrates requesting and providing Store Liveness support.
Expand Down Expand Up @@ -329,6 +332,8 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To)
}
}

sm.sender.SendAllMessages(ctx)
sm.metrics.HeartbeatSuccesses.Inc(int64(successes))
sm.metrics.HeartbeatFailures.Inc(int64(len(heartbeats) - successes))
log.KvExec.VInfof(ctx, 2, "sent heartbeats to %d stores", successes)
Expand Down Expand Up @@ -426,6 +431,7 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
for _, response := range responses {
_ = sm.sender.SendAsync(ctx, response)
}
sm.sender.SendAllMessages(ctx)
log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses))
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/storeliveness/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ func (tms *testMessageSender) SendAsync(_ context.Context, msg slpb.Message) (se
return true
}

func (tms *testMessageSender) SendAllMessages(_ context.Context) {
// No-op.
}

func (tms *testMessageSender) drainSentMessages() []slpb.Message {
tms.mu.Lock()
defer tms.mu.Unlock()
Expand Down
178 changes: 158 additions & 20 deletions pkg/kv/kvserver/storeliveness/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/taskpacer"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
Expand Down Expand Up @@ -54,7 +55,10 @@ type MessageHandler interface {

// sendQueue is a queue of outgoing Messages.
type sendQueue struct {
messages chan slpb.Message
// When sendMessages channel has anything, the sender will attempt to send
// messages.
sendMessages chan struct{}
messages chan slpb.Message
}

// Transport handles the RPC messages for Store Liveness.
Expand All @@ -79,10 +83,27 @@ type Transport struct {

// TransportKnobs includes all knobs for testing.
knobs *TransportKnobs

// Once signaled, we will signal to all sendQueues to send all messages.
sendAllMessages chan struct{}
}

var _ MessageSender = (*Transport)(nil)

type pacerConfig struct {
}

// TODO: Make these configurable.
// GetRefresh implements the taskpacer.Config interface
func (c pacerConfig) GetRefresh() time.Duration {
return 10 * time.Millisecond
}

// TODO: Make these configurable.
func (c pacerConfig) GetSmear() time.Duration {
return 1 * time.Millisecond
}

// NewTransport creates a new Store Liveness Transport.
func NewTransport(
ambient log.AmbientContext,
Expand All @@ -97,12 +118,13 @@ func NewTransport(
knobs = &TransportKnobs{}
}
t := &Transport{
AmbientContext: ambient,
stopper: stopper,
clock: clock,
dialer: dialer,
metrics: newTransportMetrics(),
knobs: knobs,
AmbientContext: ambient,
stopper: stopper,
clock: clock,
dialer: dialer,
metrics: newTransportMetrics(),
knobs: knobs,
sendAllMessages: make(chan struct{}, 1),
}
if grpcServer != nil {
slpb.RegisterStoreLivenessServer(grpcServer, t)
Expand All @@ -112,6 +134,104 @@ func NewTransport(
return nil, err
}
}

// Start background goroutine to act as the transport sender coordinator. It
// is responsible for instructing the sendQueues to send their messages.
if err := stopper.RunAsyncTask(
context.Background(), "storeliveness transport send coordinator",
func(ctx context.Context) {
var batchTimer timeutil.Timer
defer batchTimer.Stop()

conf := pacerConfig{}
pacer := taskpacer.New(conf)

// This will hold the channels we need to signal to send messages.
toSignal := make([]chan struct{}, 0)

for {
select {
case <-stopper.ShouldQuiesce():
return

case <-t.sendAllMessages:
// We received a signal to send all messages. Before we do that, let's
// wait for a short duration to give other stores a chance to
// enqueue messages which will increase batching opportunities.
// TODO: Make this configurable.
batchTimer.Reset(batchDuration)
for done := false; !done; {
select {
case <-t.sendAllMessages:
// Consume any additional signals to send all messages.

case <-batchTimer.C:
// We have waited to batch messages
done = true
}
}

// At this point, we have waited for a short duration. We now need
// to signal all queues to send their messages.

// Get all the sendQueues that have messages to send. Note that the
// atomicity here is per sendQueue, and not across all sendQueues.
t.queues.Range(func(nodeID roachpb.NodeID, q *sendQueue) bool {
if len(q.messages) == 0 {
// Nothing to send.
return true
}

toSignal = append(toSignal, q.sendMessages)
return true
})

// There is a benign race condition here, and it happens in two cases:
// 1. If after we inserted the toSignal channels, a new message is
// enqueued to a new queue that we haven't added. In this case, the
// t.sendAllMessages should be set, and we will pick it up in the next
// iteration of the for loop.
// 2. If after we inserted the toSignal channels, a new message is
// added to a queue that we have already added. In this case, in the
// next iteration t.sendAllMessages might be valid, but the queues
// could be empty. This is not a problem because we won't wake up
// any sendQueue goroutine unnecessarily.

// Pace the signaling of the channels.
pacer.StartTask(timeutil.Now())
workLeft := len(toSignal)
for workLeft > 0 {
todo, by := pacer.Pace(timeutil.Now(), workLeft)

// Pop todo items off the toSignal slice and signal them.
for i := 0; i < todo && workLeft > 0; i++ {
ch := toSignal[len(toSignal)-1]
toSignal = toSignal[:len(toSignal)-1]
select {
case ch <- struct{}{}:
default:
}
workLeft--
}

if workLeft > 0 && timeutil.Now().Before(by) {
time.Sleep(by.Sub(timeutil.Now()))
}
}
// test assert that toSignal is empty.
// TODO: Remove this assertion. I just wanted to make sure no tests
// failed because of this.
if len(toSignal) != 0 {
log.KvExec.Fatalf(ctx, "toSignal is not empty")
}
}
}

},
); err != nil {
return nil, err
}

return t, nil
}

Expand Down Expand Up @@ -219,6 +339,7 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) {
// The returned bool may be a false positive but will never be a false negative;
// if sent is true the message may or may not actually be sent but if it's false
// the message definitely was not sent.
// TODO: Change the name of this function to better describe it.
func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued bool) {
toNodeID := msg.To.NodeID
fromNodeID := msg.From.NodeID
Expand Down Expand Up @@ -264,12 +385,22 @@ func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued b
}
}

// SendAllMessages signals all queues to send all their messages.
func (t *Transport) SendAllMessages(ctx context.Context) {
select {
case t.sendAllMessages <- struct{}{}:
default:
}
}

// getQueue returns the queue for the specified node ID and a boolean
// indicating whether the queue already exists (true) or was created (false).
func (t *Transport) getQueue(nodeID roachpb.NodeID) (*sendQueue, bool) {
queue, ok := t.queues.Load(nodeID)
if !ok {
q := sendQueue{messages: make(chan slpb.Message, sendBufferSize)}
q := sendQueue{sendMessages: make(chan struct{}, 1),
messages: make(chan slpb.Message, sendBufferSize),
}
queue, ok = t.queues.LoadOrStore(nodeID, &q)
}
return queue, ok
Expand Down Expand Up @@ -361,9 +492,8 @@ func (t *Transport) processQueue(
}
var idleTimer timeutil.Timer
defer idleTimer.Stop()
var batchTimer timeutil.Timer
defer batchTimer.Stop()
batch := &slpb.MessageBatch{}

for {
idleTimer.Reset(getIdleTimeout())
select {
Expand All @@ -374,36 +504,44 @@ func (t *Transport) processQueue(
t.metrics.SendQueueIdle.Inc(1)
return nil

case msg := <-q.messages:
batch.Messages = append(batch.Messages, msg)
t.metrics.SendQueueSize.Dec(1)
t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
case <-q.sendMessages:
// We need to send all the messages in our queue.

// Pull off as many queued requests as possible within batchDuration.
batchTimer.Reset(batchDuration)
for done := false; !done; {
queueHasItems := true
for queueHasItems {
select {
case msg = <-q.messages:
case msg := <-q.messages:
batch.Messages = append(batch.Messages, msg)
t.metrics.SendQueueSize.Dec(1)
t.metrics.SendQueueBytes.Dec(int64(msg.Size()))
case <-batchTimer.C:
done = true

default:
// We have sent all messages in the queue. Exit the for loop.
queueHasItems = false
}
}

if len(batch.Messages) == 0 {
// Nothing to send.
continue
}

// At this point, we have drained the queue. Send the batch.
batch.Now = t.clock.NowAsClockTimestamp()
if err = stream.Send(batch); err != nil {
t.metrics.MessagesSendDropped.Inc(int64(len(batch.Messages)))
return err
}
t.metrics.MessagesSent.Inc(int64(len(batch.Messages)))

//log.KvExec.Infof(context.Background(), "!!! IBRAHIM !!! Sent batch of %d messages from node %d to node %d", len(batch.Messages), batch.Messages[0].From.NodeID, batch.Messages[0].To.NodeID)

// Reuse the Messages slice, but zero out the contents to avoid delaying
// GC of memory referenced from within.
for i := range batch.Messages {
batch.Messages[i] = slpb.Message{}
}

batch.Messages = batch.Messages[:0]
batch.Now = hlc.ClockTimestamp{}
}
Expand Down
Loading