Skip to content

Commit

Permalink
googlecloud subscriber: replace ctx with configurable timeout (#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Jun 1, 2019
1 parent 8bac02f commit 54fc7f5
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 20 deletions.
6 changes: 0 additions & 6 deletions message/infrastructure/googlecloud/pubsub_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package googlecloud_test

import (
"context"
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -22,11 +20,7 @@ func BenchmarkSubscriber(b *testing.B) {
panic(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{},
logger,
)
Expand Down
14 changes: 5 additions & 9 deletions message/infrastructure/googlecloud/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ func newPubSub(t *testing.T, marshaler googlecloud.MarshalerUnmarshaler, subscri
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

subscriber, err := googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{
GenerateSubscriptionName: subscriptionName,
SubscriptionConfig: pubsub.SubscriptionConfig{
Expand Down Expand Up @@ -74,9 +70,6 @@ func TestPublishSubscribe(t *testing.T) {
}

func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

rand.Seed(time.Now().Unix())
testNumber := rand.Int()
logger := watermill.NewStdLogger(true, true)
Expand All @@ -85,21 +78,24 @@ func TestSubscriberUnexpectedTopicForSubscription(t *testing.T) {
return fmt.Sprintf("sub_%d", testNumber)
}

sub1, err := googlecloud.NewSubscriber(ctx, googlecloud.SubscriberConfig{
sub1, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
GenerateSubscriptionName: subNameFn,
}, logger)
require.NoError(t, err)

topic1 := fmt.Sprintf("topic1_%d", testNumber)

sub2, err := googlecloud.NewSubscriber(ctx, googlecloud.SubscriberConfig{
sub2, err := googlecloud.NewSubscriber(googlecloud.SubscriberConfig{
GenerateSubscriptionName: subNameFn,
}, logger)
require.NoError(t, err)
topic2 := fmt.Sprintf("topic2_%d", testNumber)

howManyMessages := 100

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

messagesTopic1, err := sub1.Subscribe(ctx, topic1)
require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion message/infrastructure/googlecloud/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type SubscriberConfig struct {
// Otherwise, trying to create a subscription on non-existent topic results in `ErrTopicDoesNotExist`.
DoNotCreateTopicIfMissing bool

// ConnectTimeout defines the timeout for connecting to Pub/Sub
ConnectTimeout time.Duration
// InitializeTimeout defines the timeout for initializing topics.
InitializeTimeout time.Duration

Expand Down Expand Up @@ -96,6 +98,9 @@ func (c *SubscriberConfig) setDefaults() {
if c.GenerateSubscriptionName == nil {
c.GenerateSubscriptionName = TopicSubscriptionName
}
if c.ConnectTimeout == 0 {
c.ConnectTimeout = time.Second * 10
}
if c.InitializeTimeout == 0 {
c.InitializeTimeout = time.Second * 10
}
Expand All @@ -105,12 +110,14 @@ func (c *SubscriberConfig) setDefaults() {
}

func NewSubscriber(
ctx context.Context,
config SubscriberConfig,
logger watermill.LoggerAdapter,
) (*Subscriber, error) {
config.setDefaults()

ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()

client, err := pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...)
if err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions tools/mill/cmd/googlecloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,7 @@ For the configuration of consuming/producing of the messages, check the help of
}
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

consumer, err = googlecloud.NewSubscriber(
ctx,
googlecloud.SubscriberConfig{
GenerateSubscriptionName: func(topic string) string {
return subName
Expand Down

0 comments on commit 54fc7f5

Please sign in to comment.