44
55//go:build integration && !agentbeat
66
7- package gcppubsub
7+ package gcppubsub_test
88
99import (
1010 "bytes"
@@ -20,6 +20,8 @@ import (
2020
2121 "github.com/elastic/beats/v7/libbeat/otelbeat/oteltest"
2222 "github.com/elastic/beats/v7/libbeat/tests/integration"
23+ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub/testutil"
24+ "github.com/elastic/beats/v7/x-pack/libbeat/common/otelbeat/oteltestcol"
2325
2426 "github.com/elastic/elastic-agent-libs/testing/estools"
2527)
@@ -28,17 +30,17 @@ func TestGCPInputOTelE2E(t *testing.T) {
2830 integration .EnsureESIsRunning (t )
2931
3032 // Create pubsub client for setting up and communicating to emulator.
31- client , clientCancel := testSetup (t )
33+ client , clientCancel := testutil . TestSetup (t )
3234 defer func () {
3335 clientCancel ()
3436 client .Close ()
3537 }()
3638
37- createTopic (t , client )
38- createSubscription (t , "test-subscription-otel" , client )
39- createSubscription (t , "test-subscription-fb" , client )
39+ testutil . CreateTopic (t , client )
40+ testutil . CreateSubscription (t , "test-subscription-otel" , client )
41+ testutil . CreateSubscription (t , "test-subscription-fb" , client )
4042 const numMsgs = 10
41- publishMessages (t , client , numMsgs )
43+ testutil . PublishMessages (t , client , numMsgs )
4244
4345 host := integration .GetESURL (t , "http" )
4446 user := host .User .Username ()
@@ -56,7 +58,7 @@ func TestGCPInputOTelE2E(t *testing.T) {
5658 Subscription string
5759 }
5860
59- gcpConfig := `filebeat.inputs:
61+ gcpFilebeatConfig := `filebeat.inputs:
6062- type: gcp-pubsub
6163 project_id: test-project-id
6264 topic: test-topic-foo
@@ -80,13 +82,76 @@ processors:
8082 - add_kubernetes_metadata: ~
8183`
8284
83- // start filebeat in otel mode
84- filebeatOTel := integration .NewBeat (
85- t ,
86- "filebeat-otel" ,
87- "../../filebeat.test" ,
88- "otel" ,
89- )
85+ gcpOTelConfig := `exporters:
86+ elasticsearch:
87+ auth:
88+ authenticator: beatsauth
89+ compression: gzip
90+ compression_params:
91+ level: 1
92+ endpoints:
93+ - {{ .ESURL }}
94+ logs_dynamic_pipeline:
95+ enabled: true
96+ logs_index: logs-integration-{{ .Namespace }}
97+ mapping:
98+ mode: bodymap
99+ max_conns_per_host: 1
100+ password: {{ .Password }}
101+ retry:
102+ enabled: true
103+ initial_interval: 1s
104+ max_interval: 1m0s
105+ max_retries: 3
106+ sending_queue:
107+ batch:
108+ flush_timeout: 10s
109+ max_size: 1600
110+ min_size: 0
111+ sizer: items
112+ block_on_overflow: true
113+ enabled: true
114+ num_consumers: 1
115+ queue_size: 3200
116+ wait_for_result: true
117+ user: {{ .Username }}
118+ extensions:
119+ beatsauth:
120+ idle_connection_timeout: 3s
121+ proxy_disable: false
122+ timeout: 1m30s
123+ receivers:
124+ filebeatreceiver:
125+ filebeat:
126+ inputs:
127+ - credentials_file: "testdata/fake.json"
128+ project_id: test-project-id
129+ subscription:
130+ name: {{ .Subscription }}
131+ topic: test-topic-foo
132+ type: gcp-pubsub
133+ output:
134+ otelconsumer:
135+ processors:
136+ - add_host_metadata: ~
137+ - add_cloud_metadata: ~
138+ - add_docker_metadata: ~
139+ - add_kubernetes_metadata: ~
140+ queue.mem.flush.timeout: 0s
141+ setup.template.enabled: false
142+ service:
143+ extensions:
144+ - beatsauth
145+ pipelines:
146+ logs:
147+ exporters:
148+ - elasticsearch
149+ receivers:
150+ - filebeatreceiver
151+ telemetry:
152+ metrics:
153+ level: none
154+ `
90155
91156 optionsValue := options {
92157 ESURL : fmt .Sprintf ("%s://%s" , host .Scheme , host .Host ),
@@ -97,19 +162,16 @@ processors:
97162 var configBuffer bytes.Buffer
98163 optionsValue .Namespace = otelNamespace
99164 optionsValue .Subscription = "test-subscription-otel"
100- require .NoError (t , template .Must (template .New ("config" ).Parse (gcpConfig )).Execute (& configBuffer , optionsValue ))
101-
102- filebeatOTel .WriteConfigFile (configBuffer .String ())
165+ require .NoError (t , template .Must (template .New ("config" ).Parse (gcpOTelConfig )).Execute (& configBuffer , optionsValue ))
103166
104- filebeatOTel .Start ()
105- defer filebeatOTel .Stop ()
167+ oteltestcol .New (t , configBuffer .String ())
106168
107169 // reset buffer
108170 configBuffer .Reset ()
109171
110172 optionsValue .Namespace = fbNameSpace
111173 optionsValue .Subscription = "test-subscription-fb"
112- require .NoError (t , template .Must (template .New ("config" ).Parse (gcpConfig )).Execute (& configBuffer , optionsValue ))
174+ require .NoError (t , template .Must (template .New ("config" ).Parse (gcpFilebeatConfig )).Execute (& configBuffer , optionsValue ))
113175
114176 // start filebeat
115177 filebeat := integration .NewBeat (
0 commit comments