Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

googlecloud subscriber: replace ctx with configurable timeout #89

Merged
merged 1 commit into from
Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions message/infrastructure/googlecloud/pubsub_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package googlecloud_test

import (
"context"
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -22,11 +20,7 @@ func BenchmarkSubscriber(b *testing.B) {
panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{},
logger,
)
Expand Down
14 changes: 5 additions & 9 deletions message/infrastructure/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{
GenerateSubscriptionName: subscriptionName,
SubscriptionConfig: pubsub.SubscriptionConfig{
Expand Down Expand Up @@ -74,9 +70,6 @@ func TestPublishSubscribe(t *testing.T) {
}

func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

rand.Seed(time.Now().Unix())
testNumber := rand.Int()
logger := watermill.NewStdLogger(true, true)
Expand All @@ -85,21 +78,24 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
return fmt.Sprintf("sub_%d", testNumber)
}

sub1, err := googlecloud.NewSubscriber(ctx, googlecloud.SubscriberConfig{
sub1, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
GenerateSubscriptionName: subNameFn,
}, logger)
require.NoError(t, err)

topic1 := fmt.Sprintf("topic1_%d", testNumber)

sub2, err := googlecloud.NewSubscriber(ctx, googlecloud.SubscriberConfig{
sub2, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
GenerateSubscriptionName: subNameFn,
}, logger)
require.NoError(t, err)
topic2 := fmt.Sprintf("topic2_%d", testNumber)

howManyMessages := 100

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messagesTopic1, err := sub1.Subscribe(ctx, topic1)
require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion message/infrastructure/googlecloud/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type SubscriberConfig struct {
// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
// InitializeTimeout defines the timeout for initializing topics.
InitializeTimeout time.Duration

Expand Down Expand Up @@ -96,6 +98,9 @@ func (c *SubscriberConfig) setDefaults() {
if c.GenerateSubscriptionName == nil {
c.GenerateSubscriptionName = TopicSubscriptionName
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = time.Second * 10
}
if c.InitializeTimeout == 0 {
c.InitializeTimeout = time.Second * 10
}
Expand All @@ -105,12 +110,14 @@ func (c *SubscriberConfig) setDefaults() {
}

func NewSubscriber(
ctx context.Context,
config SubscriberConfig,
logger watermill.LoggerAdapter,
) (*Subscriber, error) {
config.setDefaults()

ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()

client, err := pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...)
if err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions tools/mill/cmd/googlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ For the configuration of consuming/producing of the messages, check the help of
}
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

consumer, err = googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{
GenerateSubscriptionName: func(topic string) string {
return subName
Expand Down