Skip to content
Permalink
Browse files
feat(pubsub): add flush method to topic (#2863)
* feat(pubsub): add flush method to topic
  • Loading branch information
hongalex committed May 24, 2021
1 parent e4fcc83 commit 825ddd692363eb2dd8cd253cc5976867e432f547
Showing with 63 additions and 0 deletions.
  1. +13 −0 pubsub/internal/scheduler/publish_scheduler.go
  2. +8 −0 pubsub/topic.go
  3. +42 −0 pubsub/topic_test.go
@@ -157,6 +157,19 @@ func (s *PublishScheduler) FlushAndStop() {
}
}

// Flush waits until all bundlers are sent.
func (s *PublishScheduler) Flush() {
var wg sync.WaitGroup
for _, b := range s.bundlers {
wg.Add(1)
go func(b *bundler.Bundler) {
defer wg.Done()
b.Flush()
}(b)
}
wg.Wait()
}

// IsPaused checks if the bundler associated with an ordering keys is
// paused.
func (s *PublishScheduler) IsPaused(orderingKey string) bool {
@@ -474,6 +474,14 @@ func (t *Topic) Stop() {
t.scheduler.FlushAndStop()
}

// Flush blocks until all remaining messages are sent.
func (t *Topic) Flush() {
if t.stopped || t.scheduler == nil {
return
}
t.scheduler.Flush()
}

type bundledMessage struct {
msg *Message
res *PublishResult
@@ -307,3 +307,45 @@ func TestDetachSubscription(t *testing.T) {
t.Errorf("DetachSubscription failed: %v", err)
}
}

func TestFlushStopTopic(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "flush-topic")
if err != nil {
t.Fatal(err)
}

// Subsequent publishes after a flush should succeed.
topic.Flush()
r := topic.Publish(ctx, &Message{
Data: []byte("hello"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after a flush should succeed.
topic.Flush()
r = topic.Publish(ctx, &Message{
Data: []byte("world"),
})
_, err = r.Get(ctx)
if err != nil {
t.Errorf("got err: %v", err)
}

// Publishing after Stop should fail.
topic.Stop()
r = topic.Publish(ctx, &Message{
Data: []byte("this should fail"),
})
_, err = r.Get(ctx)
if err != errTopicStopped {
t.Errorf("got %v, want errTopicStopped", err)
}
}

0 comments on commit 825ddd6

Please sign in to comment.