From 7366a5f2ef57aa78d6dfad7c5f49eaf2eec30013 Mon Sep 17 00:00:00 2001 From: subham sarkar Date: Thu, 10 Aug 2023 00:08:50 +0530 Subject: [PATCH] Handler reporter.Event's return value --- x-pack/metricbeat/module/sql/query/query.go | 39 +++++++++++++-------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/x-pack/metricbeat/module/sql/query/query.go b/x-pack/metricbeat/module/sql/query/query.go index 6c53178c0e45..f521acbe133a 100644 --- a/x-pack/metricbeat/module/sql/query/query.go +++ b/x-pack/metricbeat/module/sql/query/query.go @@ -146,21 +146,21 @@ func dbSelector(driver, dbName string) string { return "" } -func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.ReporterV2, queries []query) error { +func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.ReporterV2, queries []query) (bool, error) { + var ok bool merged := make(mapstr.M, 0) - for _, q := range queries { if q.ResponseFormat == tableResponseFormat { // Table format mss, err := db.FetchTableMode(ctx, q.Query) if err != nil { - return fmt.Errorf("fetch table mode failed: %w", err) + return ok, fmt.Errorf("fetch table mode failed: %w", err) } for _, ms := range mss { if m.Config.MergeResults { if len(mss) > 1 { - return fmt.Errorf("cannot merge query resulting with more than one rows: %s", q) + return ok, fmt.Errorf("cannot merge query resulting with more than one rows: %s", q) } else { for k, v := range ms { _, ok := merged[k] @@ -172,14 +172,14 @@ func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.Rep } } else { // Report immediately for non-merged cases. - m.reportEvent(ms, reporter, q.Query) + ok = m.reportEvent(ms, reporter, q.Query) } } } else { // Variable format ms, err := db.FetchVariableMode(ctx, q.Query) if err != nil { - return fmt.Errorf("fetch variable mode failed: %w", err) + return ok, fmt.Errorf("fetch variable mode failed: %w", err) } if m.Config.MergeResults { @@ -192,17 +192,17 @@ func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.Rep } } else { // Report immediately for non-merged cases. - m.reportEvent(ms, reporter, q.Query) + ok = m.reportEvent(ms, reporter, q.Query) } } } if m.Config.MergeResults { // Report here for merged case. - m.reportEvent(merged, reporter, "") + ok = m.reportEvent(merged, reporter, "") } - return nil + return ok, nil } // Fetch method implements the data gathering and data conversion to the right @@ -224,10 +224,13 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { } if !m.Config.FetchFromAllDatabases { - err = m.fetch(ctx, db, reporter, queries) + reported, err := m.fetch(ctx, db, reporter, queries) if err != nil { m.Logger().Warn("error while fetching:", err) } + if !reported { + m.Logger().Debug("error trying to emit event") + } return nil } @@ -281,10 +284,14 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { qs[i].Query = dbSelector(m.Config.Driver, dbName) + " " + qs[i].Query } - err = m.fetch(ctx, db, reporter, qs) + reported, err := m.fetch(ctx, db, reporter, qs) if err != nil { m.Logger().Warn("error while fetching:", err) } + if !reported { + m.Logger().Debug("error trying to emit event") + return nil + } } return nil @@ -292,14 +299,15 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { // reportEvent using 'user' mode with keys under `sql.metrics.*` or using Raw data mode (module and metricset key spaces // provided by the user) -func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) { +func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) bool { + var ok bool if m.Config.RawData.Enabled { // New usage. // Only driver & query field mapped. // metrics to be mapped by end user. if len(qry) > 0 { // set query. - reporter.Event(mb.Event{ + ok = reporter.Event(mb.Event{ ModuleFields: mapstr.M{ "metrics": ms, // Individual metric "driver": m.Config.Driver, @@ -307,7 +315,7 @@ func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) }, }) } else { - reporter.Event(mb.Event{ + ok = reporter.Event(mb.Event{ // Do not set query. ModuleFields: mapstr.M{ "metrics": ms, // Individual metric @@ -318,7 +326,7 @@ func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) } else { // Previous usage. Backward compatibility. // Supports field mapping. - reporter.Event(mb.Event{ + ok = reporter.Event(mb.Event{ ModuleFields: mapstr.M{ "driver": m.Config.Driver, "query": qry, @@ -326,6 +334,7 @@ func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) }, }) } + return ok } // inferTypeFromMetrics to organize the output event into 'numeric', 'strings', 'floats' and 'boolean' values