Skip to content

Commit

Permalink
Merge pull request #92 from gofr-dev/EN/google_pubsub_add_SubscribeWi…
Browse files Browse the repository at this point in the history
…thCommit

Add SubscribeWithCommit for google pubsub
  • Loading branch information
vikash committed Nov 28, 2023
2 parents 36312e3 + dd0056b commit 63c2e34
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
57 changes: 53 additions & 4 deletions pkg/datastore/pubsub/google/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,59 @@ func (g *GCPubSub) Subscribe() (*pubsub.Message, error) {
return &res, nil
}

// SubscribeWithCommit function: Google Pub/Sub handles message acknowledgements automatically,
// and you don't need to manually commit offsets like in some other messaging systems.
func (g *GCPubSub) SubscribeWithCommit(pubsub.CommitFunc) (*pubsub.Message, error) {
return g.Subscribe()
/*
SubscribeWithCommit calls the CommitFunc after subscribing message from googlePubSub and based on the return values decides
whether to commit message and consume another message
*/
func (g *GCPubSub) SubscribeWithCommit(commitFunc pubsub.CommitFunc) (*pubsub.Message, error) {
subscribeReceiveCount.WithLabelValues(g.config.TopicName, "").Inc()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var res pubsub.Message
res.Topic = g.config.TopicName

handler := func(_ context.Context, m *gpubsub.Message) {
g.processMessage(m, &res, commitFunc, cancel)
}

for {
select {
case <-ctx.Done():
return &res, nil // Context canceled or timed out
default:
if err := g.config.Subscription.Receive(ctx, handler); err != nil {
subscribeFailureCount.WithLabelValues(g.config.TopicName, "").Inc()
g.logger.Debug("Error while receiving message: ", err)

return nil, err
}

subscribeSuccessCount.WithLabelValues(g.config.TopicName, "").Inc()
g.logger.Debug("Message received successfully.")
}
}
}

func (g *GCPubSub) processMessage(m *gpubsub.Message, res *pubsub.Message, commitFunc pubsub.CommitFunc,
cancelFunc context.CancelFunc) {
g.logger.Debug("Received message: ", string(m.Data))
res.Value = string(m.Data)

// Call the commit function
isCommit, isContinue := commitFunc(&pubsub.Message{
Topic: g.config.TopicName,
Value: string(m.Data),
})

if isCommit {
m.Ack() // Acknowledge that the message has been consumed
}

if !isContinue {
cancelFunc()
}
}

func (g *GCPubSub) Bind(message []byte, target interface{}) error {
Expand Down
44 changes: 43 additions & 1 deletion pkg/datastore/pubsub/google/google_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,46 @@ func Test_createSubscription(t *testing.T) {
}
}

func Test_Subscribe(t *testing.T) {
t.Setenv("PUBSUB_BACKEND", "google")
t.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8086")

conf := config.NewGoDotEnvProvider(log.NewLogger(), "../../../../configs")

sampleData := struct {
ID string `avro:"Id"`
Name string `avro:"Name"`
Email string `avro:"Email"`
}{
ID: "1",
Name: "Rohan",
Email: "rohan@email.xyz",
}
byteData, _ := json.Marshal(sampleData)

configs := Config{
ProjectID: conf.Get("GOOGLE_PROJECT_ID"),
TopicName: conf.Get("GOOGLE_TOPIC_NAME"),
TimeoutDuration: 30,
SubscriptionDetails: &Subscription{Name: conf.Get("GOOGLE_SUBSCRIPTION_NAME")},
}

conn, err := New(&configs, log.NewMockLogger(new(bytes.Buffer)))
if err != nil {
t.Fatal(err)
}

_ = conn.PublishEvent("", sampleData, nil)

res, err := conn.Subscribe()

assert.Equal(t, res, &pubsub.Message{
Value: string(byteData),
Topic: conf.Get("GOOGLE_TOPIC_NAME")}, "Testcase Failed")

assert.Equal(t, err != nil, false, "Testcase Failed")
}

func Test_SubscribeWithCommit(t *testing.T) {
t.Setenv("PUBSUB_BACKEND", "google")
t.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8086")
Expand Down Expand Up @@ -117,7 +157,9 @@ func Test_SubscribeWithCommit(t *testing.T) {

_ = conn.PublishEvent("", sampleData, nil)

res, err := conn.SubscribeWithCommit(nil)
res, err := conn.SubscribeWithCommit(func(message *pubsub.Message) (bool, bool) {
return true, false
})

assert.Equal(t, res, &pubsub.Message{
Value: string(byteData),
Expand Down

0 comments on commit 63c2e34

Please sign in to comment.