Skip to content

Commit

Permalink
Google-pubsub: ACK msgs when acknowledged by publisher (elastic#14715)
Browse files Browse the repository at this point in the history
Update the google-pubsub input to only ACK received events when an ACK
from the internal publisher is received.

Also removes the Beta label from the input docs.

Closes elastic#13346
  • Loading branch information
adriansr committed Dec 2, 2019
1 parent d4a6086 commit 2ec17cc
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Enable `add_observer_metadata` processor in default config. {pull}11394[11394]
- Record HTTP body metadata and optionally contents in `http.response.body.*` fields. {pull}13022[13022]
- Allow `hosts` to be used to configure http monitors {pull}13703[13703]
- google-pubsub input: ACK pub/sub message when acknowledged by publisher. {issue}13346[13346] {pull}14715[14715]
- Remove Beta label from google-pubsub input. {issue}13346[13346] {pull}14715[14715]

*Journalbeat*

Expand Down
2 changes: 0 additions & 2 deletions x-pack/filebeat/docs/inputs/input-google-pubsub.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
<titleabbrev>Google Pub/Sub</titleabbrev>
++++

beta[]

Use the `google-pubsub` input to read messages from a Google Cloud Pub/Sub topic
subscription.

Expand Down
49 changes: 27 additions & 22 deletions x-pack/filebeat/input/googlepubsub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,10 @@ func NewInput(
cfg *common.Config,
connector channel.Connector,
inputContext input.Context,
) (input.Input, error) {
) (inp input.Input, err error) {
// Extract and validate the input's configuration.
conf := defaultConfig()
if err := cfg.Unpack(&conf); err != nil {
return nil, err
}

// Build outlet for events.
out, err := connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
})
if err != nil {
if err = cfg.Unpack(&conf); err != nil {
return nil, err
}

Expand All @@ -94,13 +84,31 @@ func NewInput(
"pubsub_project", conf.ProjectID,
"pubsub_topic", conf.Topic,
"pubsub_subscription", conf.Subscription),
outlet: out,
inputCtx: inputCtx,
workerCtx: workerCtx,
workerCancel: workerCancel,
ackedCount: atomic.NewUint32(0),
}

// Build outlet for events.
in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{
Processing: beat.ProcessingConfig{
DynamicFields: inputContext.DynamicFields,
},
ACKEvents: func(privates []interface{}) {
for _, priv := range privates {
if msg, ok := priv.(*pubsub.Message); ok {
msg.Ack()
in.ackedCount.Inc()
} else {
in.log.Error("Failed ACKing pub/sub event")
}
}
},
})
if err != nil {
return nil, err
}
in.log.Info("Initialized Google Pub/Sub input.")
return in, nil
}
Expand Down Expand Up @@ -152,15 +160,11 @@ func (in *pubsubInput) run() error {
// Start receiving messages.
topicID := makeTopicID(in.ProjectID, in.Topic)
return sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
if ok := in.outlet.OnEvent(makeEvent(topicID, msg)); ok {
msg.Ack()
in.ackedCount.Inc()
return
if ok := in.outlet.OnEvent(makeEvent(topicID, msg)); !ok {
msg.Nack()
in.log.Debug("OnEvent returned false. Stopping input worker.")
cancel()
}

msg.Nack()
in.log.Debug("OnEvent returned false. Stopping input worker.")
cancel()
})
}

Expand Down Expand Up @@ -205,7 +209,8 @@ func makeEvent(topicID string, msg *pubsub.Message) beat.Event {
Meta: common.MapStr{
"id": id,
},
Fields: fields,
Fields: fields,
Private: msg,
}
}

Expand Down
104 changes: 95 additions & 9 deletions x-pack/filebeat/input/googlepubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/tests/compose"
"github.com/elastic/beats/libbeat/tests/resources"
Expand Down Expand Up @@ -211,6 +212,10 @@ func isInDockerIntegTestEnv() bool {
}

func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
runTestWithACKer(t, cfg, ackEvent, run)
}

func runTestWithACKer(t *testing.T, cfg *common.Config, acker acker, run func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T)) {
if !isInDockerIntegTestEnv() {
// Don't test goroutines when using our compose.EnsureUp.
defer resources.NewGoroutinesChecker().Check(t)
Expand All @@ -226,10 +231,11 @@ func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, i
defer close(inputCtx.Done)

// Stub outlet for receiving events generated by the input.
eventOutlet := newStubOutlet()
eventOutlet := newStubOutlet(acker)
defer eventOutlet.Close()

connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) {
connector := channel.ConnectorFunc(func(_ *common.Config, cliCfg beat.ClientConfig) (channel.Outleter, error) {
eventOutlet.setClientConfig(cliCfg)
return eventOutlet, nil
})

Expand All @@ -249,19 +255,46 @@ func newInputContext() input.Context {
}
}

type acker func(beat.Event, beat.ClientConfig) bool

type stubOutleter struct {
sync.Mutex
cond *sync.Cond
done bool
Events []beat.Event
cond *sync.Cond
done bool
Events []beat.Event
clientCfg beat.ClientConfig
acker acker
}

func newStubOutlet() *stubOutleter {
o := &stubOutleter{}
func newStubOutlet(acker acker) *stubOutleter {
o := &stubOutleter{
acker: acker,
}
o.cond = sync.NewCond(o)
return o
}

func ackEvent(ev beat.Event, cfg beat.ClientConfig) bool {
switch {
case cfg.ACKCount != nil:
cfg.ACKCount(1)
case cfg.ACKEvents != nil:
evs := [1]interface{}{ev.Private}
cfg.ACKEvents(evs[:])
case cfg.ACKLastEvent != nil:
cfg.ACKLastEvent(ev.Private)
default:
return false
}
return true
}

func (o *stubOutleter) setClientConfig(cfg beat.ClientConfig) {
o.Lock()
defer o.Unlock()
o.clientCfg = cfg
}

func (o *stubOutleter) waitForEvents(numEvents int) ([]beat.Event, bool) {
o.Lock()
defer o.Unlock()
Expand Down Expand Up @@ -292,8 +325,11 @@ func (o *stubOutleter) Done() <-chan struct{} { return nil }
func (o *stubOutleter) OnEvent(event beat.Event) bool {
o.Lock()
defer o.Unlock()
o.Events = append(o.Events, event)
o.cond.Broadcast()
acked := o.acker(event, o.clientCfg)
if acked {
o.Events = append(o.Events, event)
o.cond.Broadcast()
}
return !o.done
}

Expand Down Expand Up @@ -382,3 +418,53 @@ func TestRunStop(t *testing.T) {
input.Stop()
})
}

func TestEndToEndACK(t *testing.T) {
cfg := defaultTestConfig()

var count atomic.Int
seen := make(map[string]struct{})
// ACK every other message
halfAcker := func(ev beat.Event, clientConfig beat.ClientConfig) bool {
msg := ev.Private.(*pubsub.Message)
seen[msg.ID] = struct{}{}
if count.Inc()&1 != 0 {
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
msg.Nack()
return false
}
return ackEvent(ev, clientConfig)
}

runTestWithACKer(t, cfg, halfAcker, func(client *pubsub.Client, input *pubsubInput, out *stubOutleter, t *testing.T) {
createTopic(t, client)
createSubscription(t, client)

group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

const numMsgs = 10
publishMessages(t, client, numMsgs)
events, ok := out.waitForEvents(numMsgs)
if !ok {
t.Fatalf("Expected %d events, but got %d.", 1, len(events))
}

// Assert that all messages were eventually received
assert.Len(t, events, len(seen))
got := make(map[string]struct{})
for _, ev := range events {
msg := ev.Private.(*pubsub.Message)
got[msg.ID] = struct{}{}
}
for id := range seen {
_, exists := got[id]
assert.True(t, exists)
}
input.Stop()
out.Close()
if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

0 comments on commit 2ec17cc

Please sign in to comment.