Skip to content

Commit

Permalink
add flow controlled messages views
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Jul 10, 2024
1 parent 689e346 commit 69a16da
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ type ReceiveSettings struct {
// If MaxCallbacks is 0, the value will be a multiplier of MaxOutstandingMessages.
// If the value is negative, then there will be no limit on the number of
// callbacks invoked concurrently.
//
// It is EXPERIMENTAL and subject to change or removal without notice.
MaxCallbacks int
}
Expand Down Expand Up @@ -1395,8 +1396,12 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
return nil
default:
}
fmt.Printf("iter got %d messages\n", len(msgs))
for i, msg := range msgs {
msg := msg
// TODO(jba): call acquire closer to when the message is allocated.
recordStat(ctx, FlowControlledMessages, 1)
recordStat(ctx, FlowControlledBytes, int64(len(msg.Data)))
if err := fc.acquire(ctx, len(msg.Data)); err != nil {
// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
for _, m := range msgs[i:] {
Expand All @@ -1405,6 +1410,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
// Return nil if the context is done, not err.
return nil
}

iter.eoMu.RLock()
msgAckHandler(msg, iter.enableExactlyOnceDelivery)
iter.eoMu.RUnlock()
Expand Down
20 changes: 20 additions & 0 deletions pubsub/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)

// FlowControlledMessages is a measure of the number of messages stuck in flow control.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlowControlledMessages = stats.Int64(statsPrefix+"flow_controlled_messages", "Measure of flow controlled messages", stats.UnitDimensionless)

// FlowControlWaitTime is a measure of the bytes of messages stuck in flow control.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlowControlledBytes = stats.Int64(statsPrefix+"flow_controlled_bytes", "Measure of flow controlled messages in bytes", stats.UnitBytes)

// PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed.
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)
Expand Down Expand Up @@ -162,6 +170,14 @@ var (
// PublisherOutstandingBytesView is the last value of OutstandingBytes
// It is EXPERIMENTAL and subject to change or removal without notice.
PublisherOutstandingBytesView *view.View

// FlowControlledMessagesView is the sum of flow controlled messages.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlowControlledMessagesView *view.View

// FlowControlledBytesView is the sum of bytes of flow controlled messages.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlowControlledBytesView *view.View
)

func init() {
Expand All @@ -180,6 +196,8 @@ func init() {
StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
FlowControlledMessagesView = createCountView(FlowControlledMessages, keySubscription)
FlowControlledBytesView = createCountView(FlowControlledBytes, keySubscription)

DefaultPublishViews = []*view.View{
PublishedMessagesView,
Expand All @@ -200,6 +218,8 @@ func init() {
StreamResponseCountView,
OutstandingMessagesView,
OutstandingBytesView,
FlowControlledMessagesView,
FlowControlledBytesView,
}
}

Expand Down

0 comments on commit 69a16da

Please sign in to comment.