Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub message ordering support #724

Merged
merged 1 commit into from
Oct 26, 2021

Conversation

skymeyer
Copy link
Contributor

@skymeyer skymeyer commented Oct 13, 2021

Fixes #723

Message ordering may require producing the events using an explicit regional endpoint. Not sure if we should add a helper to produce a pubsub client using a regional service endpoint or leave this as an exercise to the integrator.

@skymeyer skymeyer force-pushed the pubsub-ordering branch 2 times, most recently from 5f50abb to 2b40def Compare October 13, 2021 09:41
@n3wscott
Copy link
Member

oh awesome. taking a look, back from kubecon

@n3wscott
Copy link
Member

Looks like you got tripped up on a data race in the unit test, could you take a look @skymeyer ? overall this code looks good, it might be a simple fix for just the test.

WARNING: DATA RACE
Write at 0x00c00028b9f0 by goroutine 91:
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateTopicInfo()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:143 +0x43a
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateTopic()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:149 +0x74
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).Publish()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:300 +0x6d
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.TestPublishReceiveRoundtripWithOrderingKey()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection_test.go:790 +0x946
  testing.tRunner()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1050 +0x1eb

Previous write at 0x00c00028b9f0 by goroutine 162:
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateTopicInfo()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:143 +0x43a
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateTopic()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:149 +0x74
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateSubscriptionInfo.func1()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:224 +0x643
  sync.(*Once).doSlow()
      /opt/hostedtoolcache/go/1.14.15/x64/src/sync/once.go:66 +0x103
  sync.(*Once).Do()
      /opt/hostedtoolcache/go/1.14.15/x64/src/sync/once.go:57 +0x68
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateSubscriptionInfo()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:207 +0x202
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).getOrCreateSubscription()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:266 +0x74
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.(*Connection).Receive()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection.go:313 +0x79

Goroutine 91 (running) created at:
  testing.(*T).Run()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1095 +0x537
  testing.runTests.func1()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1339 +0xa6
  testing.tRunner()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1050 +0x1eb
  testing.runTests()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1337 +0x594
  testing.(*M).Run()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1252 +0x2ff
  main.main()
      _testmain.go:134 +0x347

Goroutine 162 (running) created at:
  github.com/cloudevents/sdk-go/protocol/pubsub/v2/internal.TestPublishReceiveRoundtripWithOrderingKey()
      /home/runner/work/sdk-go/sdk-go/protocol/pubsub/v2/internal/connection_test.go:772 +0x5bd
  testing.tRunner()
      /opt/hostedtoolcache/go/1.14.15/x64/src/testing/testing.go:1050 +0x1eb
==================
    testing.go:965: race detected during execution of test
--- FAIL: TestPublishReceiveRoundtripWithOrderingKey (1.13s)

@skymeyer
Copy link
Contributor Author

Thanks @n3wscott - will take a look, prob need to add some lock.

Signed-off-by: Jelle Vink <jelle.vink@gmail.com>
@skymeyer
Copy link
Contributor Author

@n3wscott I was setting ti.topic.EnableMessageOrdering at an incorrect spot, should be addressed now.

@n3wscott n3wscott merged commit adacec4 into cloudevents:main Oct 26, 2021
@skymeyer skymeyer deleted the pubsub-ordering branch October 27, 2021 10:45
@rriolobos
Copy link

rriolobos commented Feb 7, 2022

I tested pubsub ordering key with this library but I found an issue. When you send 3 or more messages, with same ordering key, receiver only waits for first one, second and third messages are consumed concurrently. I added a 5 second sleep in message handler in order to check that the total amount of time was 15 seconds as expected, but it was 10 second. It can also be checked with logs. When I try the same directly invoking pubsub receiver, it worked as expected.

func receive(ctx context.Context, event event.Event) error {
	fmt.Printf("Event Context: %+v\n", event.Context)

	fmt.Printf("Protocol Context: %+v\n", pscontext.ProtocolContextFrom(ctx))

	data := &Example{}
	if err := event.DataAs(data); err != nil {
		fmt.Printf("Got Data Error: %s\n", err.Error())
	}

	time.Sleep(8 * time.Second)

	fmt.Printf("Data processed: %+v\n", data)

	fmt.Printf("----------------------------\n")

	return nil
}

func main() {
	ctx := context.Background()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID(env.ProjectID),
		cepubsub.WithTopicID(env.TopicID),
		cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{}),
		cepubsub.WithSubscriptionID(env.SubscriptionID))
	if err != nil {
		log.Fatalf("failed to create pubsub protocol, %s", err.Error())
	}

	t.ReceiveSettings.NumGoroutines = 1

	c, err := cloudevents.NewClient(t)

	if err != nil {
		log.Fatalf("failed to create client, %s", err.Error())
	}

	log.Println("Created client, listening...")

	if err := c.StartReceiver(ctx, receive); err != nil {
		log.Fatalf("failed to start pubsub receiver, %s", err.Error())
	}
}

@skymeyer
Copy link
Contributor Author

skymeyer commented Feb 8, 2022

Question from @rriolobos is tracked under #753.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pub/Sub ordering key support
3 participants