Skip to content

Commit

Permalink
docs(pubsub): update allowed message retention duration by server (#8559
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hongalex committed Jan 19, 2024
1 parent 1cf8cf1 commit 78c178b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 46 deletions.
89 changes: 44 additions & 45 deletions pubsub/integration_test.go
Expand Up @@ -1968,55 +1968,54 @@ func TestIntegration_TopicRetention(t *testing.T) {
c := integrationTestClient(ctx, t)
defer c.Close()

testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) {
tc := TopicConfig{
RetentionDuration: 50 * time.Minute,
}
topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), &tc)
if err != nil {
r.Errorf("failed to create topic: %v", err)
}
defer topic.Delete(ctx)
defer topic.Stop()
tc := TopicConfig{
RetentionDuration: 31 * 24 * time.Hour, // max retention duration
}

newDur := 11 * time.Minute
cfg, err := topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: newDur,
})
if err != nil {
r.Errorf("failed to update topic: %v", err)
}
if got := cfg.RetentionDuration; got != newDur {
r.Errorf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
}
topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), &tc)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
defer topic.Delete(ctx)
defer topic.Stop()

// Create a subscription on the topic and read TopicMessageRetentionDuration.
s, err := createSubWithRetry(ctx, t, c, subIDs.New(), SubscriptionConfig{
Topic: topic,
})
if err != nil {
r.Errorf("failed to create subscription: %v", err)
}
defer s.Delete(ctx)
sCfg, err := s.Config(ctx)
if err != nil {
r.Errorf("failed to get sub config: %v", err)
}
if got := sCfg.TopicMessageRetentionDuration; got != newDur {
r.Errorf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
}
newDur := 11 * time.Minute
cfg, err := topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: newDur,
})
if err != nil {
t.Fatalf("failed to update topic: %v", err)
}
if got := cfg.RetentionDuration; got != newDur {
t.Fatalf("cfg.RetentionDuration, got: %v, want: %v", got, newDur)
}

// Clear retention duration by setting to a negative value.
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: -1 * time.Minute,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != nil {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
// Create a subscription on the topic and read TopicMessageRetentionDuration.
s, err := createSubWithRetry(ctx, t, c, subIDs.New(), SubscriptionConfig{
Topic: topic,
})
if err != nil {
t.Fatalf("failed to create subscription: %v", err)
}
defer s.Delete(ctx)
sCfg, err := s.Config(ctx)
if err != nil {
t.Fatalf("failed to get sub config: %v", err)
}
if got := sCfg.TopicMessageRetentionDuration; got != newDur {
t.Fatalf("sCfg.TopicMessageRetentionDuration, got: %v, want: %v", got, newDur)
}

// Clear retention duration by setting to a negative value.
cfg, err = topic.Update(ctx, TopicConfigToUpdate{
RetentionDuration: -1 * time.Minute,
})
if err != nil {
t.Fatal(err)
}
if got := cfg.RetentionDuration; got != nil {
t.Fatalf("expected cleared retention duration, got: %v", got)
}
}

func TestIntegration_ExactlyOnceDelivery_PublishReceive(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/topic.go
Expand Up @@ -228,7 +228,7 @@ type TopicConfig struct {
// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `RetentionDuration` in the past. If this field is
// not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 7 days or less than 10 minutes.
// subscriptions. Cannot be more than 31 days or less than 10 minutes.
//
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
RetentionDuration optional.Duration
Expand Down

0 comments on commit 78c178b

Please sign in to comment.