diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 8dbeb193b79..5a11985ab24 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -53,6 +53,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632] - Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455] - Remove `NumCPU` as clients should update the CPU count on the fly in case of config changes in a VM. {pull}23154[23154] +- Remove Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}25093[25093] ==== Bugfixes diff --git a/metricbeat/helper/prometheus/ptest/ptest.go b/metricbeat/helper/prometheus/ptest/ptest.go index c59c62c4050..76a1899c457 100644 --- a/metricbeat/helper/prometheus/ptest/ptest.go +++ b/metricbeat/helper/prometheus/ptest/ptest.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" "github.com/elastic/beats/v7/metricbeat/mb/testing/flags" @@ -45,87 +44,6 @@ type TestCases []struct { ExpectedFile string } -// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted -// into the expected events when passed by the given metricset. -// If -data flag is passed, the expected JSON file will be updated with the result -func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) { - for _, test := range cases { - t.Logf("Testing %s file\n", test.MetricsFile) - - file, err := os.Open(test.MetricsFile) - assert.NoError(t, err, "cannot open test file "+test.MetricsFile) - - body, err := ioutil.ReadAll(file) - assert.NoError(t, err, "cannot read test file "+test.MetricsFile) - - server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1") - w.Write([]byte(body)) - })) - - server.Start() - defer server.Close() - - config := map[string]interface{}{ - "module": module, - "metricsets": []string{metricset}, - "hosts": []string{server.URL}, - } - - f := mbtest.NewEventsFetcher(t, config) - events, err := f.Fetch() - assert.Nil(t, err, "Errors while fetching metrics") - - if *flags.DataFlag { - sort.SliceStable(events, func(i, j int) bool { - h1, _ := hashstructure.Hash(events[i], nil) - h2, _ := hashstructure.Hash(events[j], nil) - return h1 < h2 - }) - eventsJSON, _ := json.MarshalIndent(events, "", "\t") - err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644) - assert.NoError(t, err) - } - - // Read expected events from reference file - expected, err := ioutil.ReadFile(test.ExpectedFile) - if err != nil { - t.Fatal(err) - } - - var expectedEvents []common.MapStr - err = json.Unmarshal(expected, &expectedEvents) - if err != nil { - t.Fatal(err) - } - - for _, event := range events { - // ensure the event is in expected list - found := -1 - for i, expectedEvent := range expectedEvents { - if event.String() == expectedEvent.String() { - found = i - break - } - } - if found > -1 { - expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...) - } else { - t.Errorf("Event was not expected: %+v", event) - } - } - - if len(expectedEvents) > 0 { - t.Error("Some events were missing:") - for _, e := range expectedEvents { - t.Error(e) - } - t.Fatal() - } - } -} - // TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected // events when passed by the given metricset. // If -data flag is passed, the expected JSON file will be updated with the result diff --git a/metricbeat/mb/builders.go b/metricbeat/mb/builders.go index 48683e05e8a..042c4124862 100644 --- a/metricbeat/mb/builders.go +++ b/metricbeat/mb/builders.go @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error { // of them. func mustImplementFetcher(ms MetricSet) error { var ifcs []string - if _, ok := ms.(EventFetcher); ok { - ifcs = append(ifcs, "EventFetcher") - } - - if _, ok := ms.(EventsFetcher); ok { - ifcs = append(ifcs, "EventsFetcher") - } - if _, ok := ms.(ReportingMetricSet); ok { ifcs = append(ifcs, "ReportingMetricSet") } @@ -256,7 +248,7 @@ func mustImplementFetcher(ms MetricSet) error { switch len(ifcs) { case 0: return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+ - "producing interface (EventFetcher, EventsFetcher, "+ + "producing interface ("+ "ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+ "PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)", ms.Module().Name(), ms.Name()) diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 2bec71d88e8..c86380fdcf1 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -133,8 +133,7 @@ func (m *BaseModule) WithConfig(config common.Config) (*BaseModule, error) { // MetricSet interfaces // MetricSet is the common interface for all MetricSet implementations. In -// addition to this interface, all MetricSets must implement either -// EventFetcher or EventsFetcher (but not both). +// addition to this interface, all MetricSets must implement a fetcher interface. type MetricSet interface { ID() string // Unique ID identifying a running MetricSet. Name() string // Name returns the name of the MetricSet. @@ -154,20 +153,6 @@ type Closer interface { Close() error } -// EventFetcher is a MetricSet that returns a single event when collecting data. -// Use ReportingMetricSet for new MetricSet implementations. -type EventFetcher interface { - MetricSet - Fetch() (common.MapStr, error) -} - -// EventsFetcher is a MetricSet that returns a multiple events when collecting -// data. Use ReportingMetricSet for new MetricSet implementations. -type EventsFetcher interface { - MetricSet - Fetch() ([]common.MapStr, error) -} - // Reporter is used by a MetricSet to report events, errors, or errors with // metadata. The methods return false if and only if publishing failed because // the MetricSet is being closed. diff --git a/metricbeat/mb/mb_test.go b/metricbeat/mb/mb_test.go index 83306d51cef..f0dedec348c 100644 --- a/metricbeat/mb/mb_test.go +++ b/metricbeat/mb/mb_test.go @@ -30,6 +30,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) +// Reporting V2 MetricSet + type testModule struct { BaseModule hostParser func(string) (HostData, error) @@ -39,25 +41,11 @@ func (m testModule) ParseHost(host string) (HostData, error) { return m.hostParser(host) } -// EventFetcher - type testMetricSet struct { BaseMetricSet } -func (m *testMetricSet) Fetch() (common.MapStr, error) { - return nil, nil -} - -// EventsFetcher - -type testMetricSetEventsFetcher struct { - BaseMetricSet -} - -func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) { - return nil, nil -} +func (m *testMetricSet) Fetch(reporter ReporterV2) {} // ReportingFetcher @@ -259,25 +247,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) { return &testMetricSet{base}, nil } - name := "EventFetcher" - if err := r.AddMetricSet(moduleName, name, factory); err != nil { - t.Fatal(err) - } - - t.Run(name+" MetricSet", func(t *testing.T) { - ms := newTestMetricSet(t, r, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{name}, - }) - _, ok := ms.(EventFetcher) - assert.True(t, ok, name+" not implemented") - }) - - factory = func(base BaseMetricSet) (MetricSet, error) { - return &testMetricSetEventsFetcher{base}, nil - } - - name = "EventsFetcher" + name := "ReportingMetricSetV2" if err := r.AddMetricSet(moduleName, name, factory); err != nil { t.Fatal(err) } @@ -287,7 +257,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) { "module": moduleName, "metricsets": []string{name}, }) - _, ok := ms.(EventsFetcher) + _, ok := ms.(ReportingMetricSetV2) assert.True(t, ok, name+" not implemented") }) diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index ba94f2162d2..f07b5edae56 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -40,7 +40,7 @@ func ExampleWrapper() { // Build a configuration object. config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{eventFetcherName}, + "metricsets": []string{reportingFetcherName}, }) if err != nil { fmt.Println("Error:", err) @@ -91,17 +91,17 @@ func ExampleWrapper() { // }, // "@timestamp": "2016-05-10T23:27:58.485Z", // "event": { - // "dataset": "fake.eventfetcher", + // "dataset": "fake.reportingfetcher", // "duration": 111, // "module": "fake" // }, // "fake": { - // "eventfetcher": { + // "reportingfetcher": { // "metric": 1 // } // }, // "metricset": { - // "name": "eventfetcher", + // "name": "reportingfetcher", // "period": 10000 // }, // "service": { @@ -120,7 +120,7 @@ func ExampleRunner() { config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{eventFetcherName}, + "metricsets": []string{reportingFetcherName}, }) if err != nil { return diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index 6b8b2d705ef..a6396f63ff5 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -36,7 +36,7 @@ func TestRunner(t *testing.T) { config, err := common.NewConfigFrom(map[string]interface{}{ "module": moduleName, - "metricsets": []string{eventFetcherName}, + "metricsets": []string{reportingFetcherName}, }) if err != nil { t.Fatal(err) diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index f0d1552c815..8d18dfbe552 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -203,8 +203,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { ms.Run(reporter.V2()) case mb.PushMetricSetV2WithContext: ms.Run(&channelContext{done}, reporter.V2()) - case mb.EventFetcher, mb.EventsFetcher, - mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: + case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: msw.startPeriodicFetching(&channelContext{done}, reporter) default: // Earlier startup stages prevent this from happening. @@ -241,10 +240,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter // and log a stack track if one occurs. func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { switch fetcher := msw.MetricSet.(type) { - case mb.EventFetcher: - msw.singleEventFetch(fetcher, reporter) - case mb.EventsFetcher: - msw.multiEventFetch(fetcher, reporter) case mb.ReportingMetricSet: reporter.StartFetchTimer() fetcher.Fetch(reporter.V1()) @@ -270,24 +265,6 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { } } -func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) { - reporter.StartFetchTimer() - event, err := fetcher.Fetch() - reporter.V1().ErrorWith(err, event) -} - -func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) { - reporter.StartFetchTimer() - events, err := fetcher.Fetch() - if len(events) == 0 { - reporter.V1().ErrorWith(err, nil) - } else { - for _, event := range events { - reporter.V1().ErrorWith(err, event) - } - } -} - // close closes the underlying MetricSet if it implements the mb.Closer // interface. func (msw *metricSetWrapper) close() error { diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index 1108b5bd737..8fcc0230bc9 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -33,7 +33,6 @@ import ( const ( moduleName = "fake" - eventFetcherName = "EventFetcher" reportingFetcherName = "ReportingFetcher" pushMetricSetName = "PushMetricSet" ) @@ -41,34 +40,8 @@ const ( // fakeMetricSet func init() { - if err := mb.Registry.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher); err != nil { - panic(err) - } - if err := mb.Registry.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher); err != nil { - panic(err) - } - if err := mb.Registry.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet); err != nil { - panic(err) - } -} - -// EventFetcher - -type fakeEventFetcher struct { - mb.BaseMetricSet -} - -func (ms *fakeEventFetcher) Fetch() (common.MapStr, error) { - t, _ := time.Parse(time.RFC3339, "2016-05-10T23:27:58.485Z") - return common.MapStr{"@timestamp": common.Time(t), "metric": 1}, nil -} - -func (ms *fakeEventFetcher) Close() error { - return nil -} - -func newFakeEventFetcher(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &fakeEventFetcher{BaseMetricSet: base}, nil + mb.Registry.MustAddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher) + mb.Registry.MustAddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet) } // ReportingFetcher @@ -83,7 +56,8 @@ func (ms *fakeReportingFetcher) Fetch(r mb.Reporter) { } func newFakeReportingFetcher(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &fakeReportingFetcher{BaseMetricSet: base}, nil + var r mb.ReportingMetricSet = &fakeReportingFetcher{BaseMetricSet: base} + return r, nil } // PushMetricSet @@ -100,7 +74,8 @@ func (ms *fakePushMetricSet) Run(r mb.PushReporter) { } func newFakePushMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { - return &fakePushMetricSet{BaseMetricSet: base}, nil + var r mb.PushMetricSet = &fakePushMetricSet{BaseMetricSet: base} + return r, nil } // test utilities @@ -108,9 +83,7 @@ func newFakePushMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { func newTestRegistry(t testing.TB) *mb.Register { r := mb.NewRegister() - err := r.AddMetricSet(moduleName, eventFetcherName, newFakeEventFetcher) - require.NoError(t, err) - err = r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher) + err := r.AddMetricSet(moduleName, reportingFetcherName, newFakeReportingFetcher) require.NoError(t, err) err = r.AddMetricSet(moduleName, pushMetricSetName, newFakePushMetricSet) require.NoError(t, err) @@ -125,37 +98,6 @@ func newConfig(t testing.TB, moduleConfig interface{}) *common.Config { // test cases -func TestWrapperOfEventFetcher(t *testing.T) { - hosts := []string{"alpha", "beta"} - c := newConfig(t, map[string]interface{}{ - "module": moduleName, - "metricsets": []string{eventFetcherName}, - "hosts": hosts, - }) - - m, err := module.NewWrapper(c, newTestRegistry(t)) - require.NoError(t, err) - - done := make(chan struct{}) - output := m.Start(done) - - <-output - <-output - close(done) - - // Validate that the channel is closed after receiving the two - // initial events. - select { - case _, ok := <-output: - if !ok { - // Channel is closed. - return - } else { - assert.Fail(t, "received unexpected event") - } - } -} - func TestWrapperOfReportingFetcher(t *testing.T) { hosts := []string{"alpha", "beta"} c := newConfig(t, map[string]interface{}{ @@ -222,7 +164,7 @@ func TestPeriodIsAddedToEvent(t *testing.T) { hasPeriod bool }{ "fetch metricset events should have period": { - metricset: eventFetcherName, + metricset: reportingFetcherName, hasPeriod: true, }, "push metricset events should not have period": { @@ -262,7 +204,7 @@ func TestNewWrapperForMetricSet(t *testing.T) { hosts := []string{"alpha"} c := newConfig(t, map[string]interface{}{ "module": moduleName, - "metricsets": []string{eventFetcherName}, + "metricsets": []string{reportingFetcherName}, "hosts": hosts, }) diff --git a/metricbeat/mb/registry.go b/metricbeat/mb/registry.go index 62ee4f6c1d0..b5b24ca865c 100644 --- a/metricbeat/mb/registry.go +++ b/metricbeat/mb/registry.go @@ -48,8 +48,7 @@ type ModuleFactory func(base BaseModule) (Module, error) // MetricSetFactory accepts a BaseMetricSet and returns a MetricSet. If there // was an error creating the MetricSet then an error will be returned. The -// returned MetricSet must also implement either EventFetcher or EventsFetcher -// (but not both). +// returned MetricSet must also implement a Fetcher interface. type MetricSetFactory func(base BaseMetricSet) (MetricSet, error) // HostParser is a function that parses a host value from the configuration diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 61edf3b7d5e..abad9d5e205 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -32,56 +32,6 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb/testing/flags" ) -// WriteEvent fetches a single event writes the output to a ./_meta/data.json -// file. -func WriteEvent(f mb.EventFetcher, t testing.TB) error { - if !*flags.DataFlag { - t.Skip("skip data generation tests") - } - - event, err := f.Fetch() - if err != nil { - return err - } - - fullEvent := CreateFullEvent(f, event) - WriteEventToDataJSON(t, fullEvent, ".") - return nil -} - -// WriteEvents fetches events and writes the first event to a ./_meta/data.json -// file. -func WriteEvents(f mb.EventsFetcher, t testing.TB) error { - return WriteEventsCond(f, t, nil) - -} - -// WriteEventsCond fetches events and writes the first event that matches the condition -// to a ./_meta/data.json file. -func WriteEventsCond(f mb.EventsFetcher, t testing.TB, cond func(e common.MapStr) bool) error { - if !*flags.DataFlag { - t.Skip("skip data generation tests") - } - - events, err := f.Fetch() - if err != nil { - return err - } - - if len(events) == 0 { - return fmt.Errorf("no events were generated") - } - - event, err := SelectEvent(events, cond) - if err != nil { - return err - } - - fullEvent := CreateFullEvent(f, event) - WriteEventToDataJSON(t, fullEvent, "") - return nil -} - // WriteEventsReporterV2 fetches events and writes the first event to a ./_meta/data.json // file. func WriteEventsReporterV2(f mb.ReportingMetricSetV2, t testing.TB, path string) error { diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 673deef850d..605e44a4d32 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -28,16 +28,18 @@ that Metricbeat does it and with the same validations. package mymetricset_test import ( + "github.com/stretchr/testify/assert" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" ) func TestFetch(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - event, err := f.Fetch() - if err != nil { - t.Fatal(err) - } + f := mbtest.NewFetcher(t, getConfig()) + events, errs := f.FetchEvents() + assert.Empty(t, errs) + assert.NotEmpty(t, events) + event := events[0] t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) // Test event attributes... @@ -117,34 +119,6 @@ func NewMetricSets(t testing.TB, config interface{}) []mb.MetricSet { return metricsets } -// NewEventFetcher instantiates a new EventFetcher using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewEventFetcher(t testing.TB, config interface{}) mb.EventFetcher { - metricSet := NewMetricSet(t, config) - - fetcher, ok := metricSet.(mb.EventFetcher) - if !ok { - t.Fatal("MetricSet does not implement EventFetcher") - } - - return fetcher -} - -// NewEventsFetcher instantiates a new EventsFetcher using the given -// configuration. The ModuleFactory and MetricSetFactory are obtained from the -// global Registry. -func NewEventsFetcher(t testing.TB, config interface{}) mb.EventsFetcher { - metricSet := NewMetricSet(t, config) - - fetcher, ok := metricSet.(mb.EventsFetcher) - if !ok { - t.Fatal("MetricSet does not implement EventsFetcher") - } - - return fetcher -} - func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet { metricSet := NewMetricSet(t, config) diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index 75cf428157c..eac924e1135 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -48,7 +48,7 @@ func getMetricsetFactory(prometheusMappings *prometheus.MetricsMapping) mb.Metri } } -// Fetch as expected by `mb.EventFetcher` +// Fetch gathers information from the apiserver and reports events with this information. func (m *metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) if err != nil {