Skip to content

Commit

Permalink
Remove EventFetcher and EventsFetcher interface (#25093)
Browse files Browse the repository at this point in the history
These interfaces have been long deprecated and are not used by any
module now. Removing them.

Co-Authored-By: Nicolas Ruflin <spam@ruflin.com>
  • Loading branch information
jsoriano and ruflin committed Apr 15, 2021
1 parent fb1eb8d commit 1c4f8b1
Show file tree
Hide file tree
Showing 13 changed files with 33 additions and 325 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Remove global ACK handler support via `SetACKHandler` from publisher pipeline. {pull}19632[19632]
- Make implementing `Close` required for `reader.Reader` interfaces. {pull}20455[20455]
- Remove `NumCPU` as clients should update the CPU count on the fly in case of config changes in a VM. {pull}23154[23154]
- Remove Metricbeat EventFetcher and EventsFetcher interface. Use the reporter interface instead. {pull}25093[25093]

==== Bugfixes

Expand Down
82 changes: 0 additions & 82 deletions metricbeat/helper/prometheus/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/metricbeat/mb"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/metricbeat/mb/testing/flags"
Expand All @@ -45,87 +44,6 @@ type TestCases []struct {
ExpectedFile string
}

// TestMetricSetEventsFetcher goes over the given TestCases and ensures that source Prometheus metrics gets converted
// into the expected events when passed by the given metricset.
// If -data flag is passed, the expected JSON file will be updated with the result
func TestMetricSetEventsFetcher(t *testing.T, module, metricset string, cases TestCases) {
for _, test := range cases {
t.Logf("Testing %s file\n", test.MetricsFile)

file, err := os.Open(test.MetricsFile)
assert.NoError(t, err, "cannot open test file "+test.MetricsFile)

body, err := ioutil.ReadAll(file)
assert.NoError(t, err, "cannot read test file "+test.MetricsFile)

server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "text/plain; charset=ISO-8859-1")
w.Write([]byte(body))
}))

server.Start()
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
assert.Nil(t, err, "Errors while fetching metrics")

if *flags.DataFlag {
sort.SliceStable(events, func(i, j int) bool {
h1, _ := hashstructure.Hash(events[i], nil)
h2, _ := hashstructure.Hash(events[j], nil)
return h1 < h2
})
eventsJSON, _ := json.MarshalIndent(events, "", "\t")
err = ioutil.WriteFile(test.ExpectedFile, eventsJSON, 0644)
assert.NoError(t, err)
}

// Read expected events from reference file
expected, err := ioutil.ReadFile(test.ExpectedFile)
if err != nil {
t.Fatal(err)
}

var expectedEvents []common.MapStr
err = json.Unmarshal(expected, &expectedEvents)
if err != nil {
t.Fatal(err)
}

for _, event := range events {
// ensure the event is in expected list
found := -1
for i, expectedEvent := range expectedEvents {
if event.String() == expectedEvent.String() {
found = i
break
}
}
if found > -1 {
expectedEvents = append(expectedEvents[:found], expectedEvents[found+1:]...)
} else {
t.Errorf("Event was not expected: %+v", event)
}
}

if len(expectedEvents) > 0 {
t.Error("Some events were missing:")
for _, e := range expectedEvents {
t.Error(e)
}
t.Fatal()
}
}
}

// TestMetricSet goes over the given TestCases and ensures that source Prometheus metrics gets converted into the expected
// events when passed by the given metricset.
// If -data flag is passed, the expected JSON file will be updated with the result
Expand Down
10 changes: 1 addition & 9 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,6 @@ func mustHaveModule(ms MetricSet, base BaseMetricSet) error {
// of them.
func mustImplementFetcher(ms MetricSet) error {
var ifcs []string
if _, ok := ms.(EventFetcher); ok {
ifcs = append(ifcs, "EventFetcher")
}

if _, ok := ms.(EventsFetcher); ok {
ifcs = append(ifcs, "EventsFetcher")
}

if _, ok := ms.(ReportingMetricSet); ok {
ifcs = append(ifcs, "ReportingMetricSet")
}
Expand Down Expand Up @@ -256,7 +248,7 @@ func mustImplementFetcher(ms MetricSet) error {
switch len(ifcs) {
case 0:
return fmt.Errorf("MetricSet '%s/%s' does not implement an event "+
"producing interface (EventFetcher, EventsFetcher, "+
"producing interface ("+
"ReportingMetricSet, ReportingMetricSetV2, ReportingMetricSetV2Error, ReportingMetricSetV2WithContext"+
"PushMetricSet, PushMetricSetV2, or PushMetricSetV2WithContext)",
ms.Module().Name(), ms.Name())
Expand Down
17 changes: 1 addition & 16 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func (m *BaseModule) WithConfig(config common.Config) (*BaseModule, error) {
// MetricSet interfaces

// MetricSet is the common interface for all MetricSet implementations. In
// addition to this interface, all MetricSets must implement either
// EventFetcher or EventsFetcher (but not both).
// addition to this interface, all MetricSets must implement a fetcher interface.
type MetricSet interface {
ID() string // Unique ID identifying a running MetricSet.
Name() string // Name returns the name of the MetricSet.
Expand All @@ -154,20 +153,6 @@ type Closer interface {
Close() error
}

// EventFetcher is a MetricSet that returns a single event when collecting data.
// Use ReportingMetricSet for new MetricSet implementations.
type EventFetcher interface {
MetricSet
Fetch() (common.MapStr, error)
}

// EventsFetcher is a MetricSet that returns a multiple events when collecting
// data. Use ReportingMetricSet for new MetricSet implementations.
type EventsFetcher interface {
MetricSet
Fetch() ([]common.MapStr, error)
}

// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
Expand Down
40 changes: 5 additions & 35 deletions metricbeat/mb/mb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

// Reporting V2 MetricSet

type testModule struct {
BaseModule
hostParser func(string) (HostData, error)
Expand All @@ -39,25 +41,11 @@ func (m testModule) ParseHost(host string) (HostData, error) {
return m.hostParser(host)
}

// EventFetcher

type testMetricSet struct {
BaseMetricSet
}

func (m *testMetricSet) Fetch() (common.MapStr, error) {
return nil, nil
}

// EventsFetcher

type testMetricSetEventsFetcher struct {
BaseMetricSet
}

func (m *testMetricSetEventsFetcher) Fetch() ([]common.MapStr, error) {
return nil, nil
}
func (m *testMetricSet) Fetch(reporter ReporterV2) {}

// ReportingFetcher

Expand Down Expand Up @@ -259,25 +247,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
return &testMetricSet{base}, nil
}

name := "EventFetcher"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}

t.Run(name+" MetricSet", func(t *testing.T) {
ms := newTestMetricSet(t, r, map[string]interface{}{
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventFetcher)
assert.True(t, ok, name+" not implemented")
})

factory = func(base BaseMetricSet) (MetricSet, error) {
return &testMetricSetEventsFetcher{base}, nil
}

name = "EventsFetcher"
name := "ReportingMetricSetV2"
if err := r.AddMetricSet(moduleName, name, factory); err != nil {
t.Fatal(err)
}
Expand All @@ -287,7 +257,7 @@ func TestNewModulesMetricSetTypes(t *testing.T) {
"module": moduleName,
"metricsets": []string{name},
})
_, ok := ms.(EventsFetcher)
_, ok := ms.(ReportingMetricSetV2)
assert.True(t, ok, name+" not implemented")
})

Expand Down
10 changes: 5 additions & 5 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ExampleWrapper() {
// Build a configuration object.
config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
fmt.Println("Error:", err)
Expand Down Expand Up @@ -91,17 +91,17 @@ func ExampleWrapper() {
// },
// "@timestamp": "2016-05-10T23:27:58.485Z",
// "event": {
// "dataset": "fake.eventfetcher",
// "dataset": "fake.reportingfetcher",
// "duration": 111,
// "module": "fake"
// },
// "fake": {
// "eventfetcher": {
// "reportingfetcher": {
// "metric": 1
// }
// },
// "metricset": {
// "name": "eventfetcher",
// "name": "reportingfetcher",
// "period": 10000
// },
// "service": {
Expand All @@ -120,7 +120,7 @@ func ExampleRunner() {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRunner(t *testing.T) {

config, err := common.NewConfigFrom(map[string]interface{}{
"module": moduleName,
"metricsets": []string{eventFetcherName},
"metricsets": []string{reportingFetcherName},
})
if err != nil {
t.Fatal(err)
Expand Down
25 changes: 1 addition & 24 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) {
ms.Run(reporter.V2())
case mb.PushMetricSetV2WithContext:
ms.Run(&channelContext{done}, reporter.V2())
case mb.EventFetcher, mb.EventsFetcher,
mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext:
msw.startPeriodicFetching(&channelContext{done}, reporter)
default:
// Earlier startup stages prevent this from happening.
Expand Down Expand Up @@ -241,10 +240,6 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter
// and log a stack track if one occurs.
func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
switch fetcher := msw.MetricSet.(type) {
case mb.EventFetcher:
msw.singleEventFetch(fetcher, reporter)
case mb.EventsFetcher:
msw.multiEventFetch(fetcher, reporter)
case mb.ReportingMetricSet:
reporter.StartFetchTimer()
fetcher.Fetch(reporter.V1())
Expand All @@ -270,24 +265,6 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
}
}

func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) {
reporter.StartFetchTimer()
event, err := fetcher.Fetch()
reporter.V1().ErrorWith(err, event)
}

func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) {
reporter.StartFetchTimer()
events, err := fetcher.Fetch()
if len(events) == 0 {
reporter.V1().ErrorWith(err, nil)
} else {
for _, event := range events {
reporter.V1().ErrorWith(err, event)
}
}
}

// close closes the underlying MetricSet if it implements the mb.Closer
// interface.
func (msw *metricSetWrapper) close() error {
Expand Down
Loading

0 comments on commit 1c4f8b1

Please sign in to comment.