Skip to content
Permalink
Browse files

feat(pubsublite): Publish settings and errors (#3075)

PublishSettings are almost identical to [pubsub.PublishSettings](https://godoc.org/cloud.google.com/go/pubsub#PublishSettings).

Some max thresholds were changed to be consistent with the [Pub/Sub Lite Java client library](https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Constants.java).
  • Loading branch information
tmdiep committed Oct 28, 2020
1 parent 97cfd45 commit 9eb9fcb79f17ad7c08c77c455ba3e8d89e3bdbf2
Showing with 239 additions and 0 deletions.
  1. +22 −0 pubsublite/errors.go
  2. +103 −0 pubsublite/settings.go
  3. +114 −0 pubsublite/settings_test.go
@@ -0,0 +1,22 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import "errors"

var (
// ErrOverflow indicates that the publish buffers have overflowed. See
// comments for PublishSettings.BufferedByteLimit.
ErrOverflow = errors.New("pubsublite: client-side publish buffers have overflowed")
)
@@ -0,0 +1,103 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"errors"
"fmt"
"time"
)

const (
// MaxPublishRequestCount is the maximum number of messages that can be
// batched in a single publish request.
MaxPublishRequestCount = 1000

// MaxPublishMessageBytes is the maximum allowed serialized size of a single
// Pub/Sub message in bytes.
MaxPublishMessageBytes = 1000000

// MaxPublishRequestBytes is the maximum allowed serialized size of a single
// publish request (containing a batch of messages) in bytes.
MaxPublishRequestBytes = 3500000
)

// PublishSettings control the batching of published messages.
type PublishSettings struct {
// Publish a non-empty batch after this delay has passed. Must be > 0.
DelayThreshold time.Duration

// Publish a batch when it has this many messages. Must be > 0. The maximum is
// MaxPublishRequestCount.
CountThreshold int

// Publish a batch when its size in bytes reaches this value. Must be > 0. The
// maximum is MaxPublishRequestBytes.
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. Must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
// that occurred while trying to reconnect. Note that if the timeout duration
// is long, ErrOverflow may occur first.
Timeout time.Duration

// The maximum number of bytes that the publisher will keep in memory before
// returning ErrOverflow. Must be > 0.
//
// Note that Pub/Sub Lite topics are provisioned a publishing throughput
// capacity, per partition, shared by all publisher clients. Setting a large
// buffer size can mitigate transient publish spikes. However, consistently
// attempting to publish messages at a much higher rate than the publishing
// throughput capacity can cause the buffers to overflow. For more
// information, see https://cloud.google.com/pubsub/lite/docs/topics.
BufferedByteLimit int
}

// DefaultPublishSettings holds the default values for PublishSettings.
var DefaultPublishSettings = PublishSettings{
DelayThreshold: 10 * time.Millisecond,
CountThreshold: 100,
ByteThreshold: 1e6,
Timeout: 60 * time.Second,
// By default set to a high limit that is not likely to occur, but prevents
// OOM errors in clients.
BufferedByteLimit: 1 << 30, // 1 GiB
}

func validatePublishSettings(settings PublishSettings) error {
if settings.DelayThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. DelayThreshold duration must be > 0")
}
if settings.Timeout <= 0 {
return errors.New("pubsublite: invalid publish settings. Timeout duration must be > 0")
}
if settings.CountThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. CountThreshold must be > 0")
}
if settings.CountThreshold > MaxPublishRequestCount {
return fmt.Errorf("pubsublite: invalid publish settings. Maximum CountThreshold is MaxPublishRequestCount (%d)", MaxPublishRequestCount)
}
if settings.ByteThreshold <= 0 {
return errors.New("pubsublite: invalid publish settings. ByteThreshold must be > 0")
}
if settings.ByteThreshold > MaxPublishRequestBytes {
return fmt.Errorf("pubsublite: invalid publish settings. Maximum ByteThreshold is MaxPublishRequestBytes (%d)", MaxPublishRequestBytes)
}
if settings.BufferedByteLimit <= 0 {
return errors.New("pubsublite: invalid publish settings. BufferedByteLimit must be > 0")
}
return nil
}
@@ -0,0 +1,114 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"testing"
"time"
)

func TestValidatePublishSettings(t *testing.T) {
for _, tc := range []struct {
desc string
// settingsFunc is passed DefaultPublishSettings
settingsFunc func(settings PublishSettings) PublishSettings
wantErr bool
}{
{
desc: "valid: default",
settingsFunc: func(settings PublishSettings) PublishSettings {
return DefaultPublishSettings
},
wantErr: false,
},
{
desc: "valid: max",
settingsFunc: func(settings PublishSettings) PublishSettings {
return PublishSettings{
CountThreshold: MaxPublishRequestCount,
ByteThreshold: MaxPublishRequestBytes,
// These have no max bounds, check large values.
DelayThreshold: 24 * time.Hour,
Timeout: 24 * time.Hour,
BufferedByteLimit: 1e10,
}
},
wantErr: false,
},
{
desc: "invalid: zero CountThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.CountThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: CountThreshold over MaxPublishRequestCount",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.CountThreshold = MaxPublishRequestCount + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: ByteThreshold over MaxPublishRequestBytes",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.ByteThreshold = MaxPublishRequestBytes + 1
return settings
},
wantErr: true,
},
{
desc: "invalid: zero ByteThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.ByteThreshold = 0
return settings
},
wantErr: true,
},
{
desc: "invalid: zero DelayThreshold",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.DelayThreshold = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero Timeout",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.Timeout = time.Duration(0)
return settings
},
wantErr: true,
},
{
desc: "invalid: zero BufferedByteLimit",
settingsFunc: func(settings PublishSettings) PublishSettings {
settings.BufferedByteLimit = 0
return settings
},
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
settings := tc.settingsFunc(DefaultPublishSettings)
gotErr := validatePublishSettings(settings)
if (gotErr != nil) != tc.wantErr {
t.Errorf("validatePublishSettings(%v) = %v, want err=%v", settings, gotErr, tc.wantErr)
}
})
}
}

0 comments on commit 9eb9fcb

Please sign in to comment.