Skip to content

Commit

Permalink
Handler reporter.Event's return value
Browse files Browse the repository at this point in the history
  • Loading branch information
shmsr committed Aug 9, 2023
1 parent 39884d5 commit 7366a5f
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions x-pack/metricbeat/module/sql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ func dbSelector(driver, dbName string) string {
return ""
}

func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.ReporterV2, queries []query) error {
func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.ReporterV2, queries []query) (bool, error) {
var ok bool
merged := make(mapstr.M, 0)

for _, q := range queries {
if q.ResponseFormat == tableResponseFormat {
// Table format
mss, err := db.FetchTableMode(ctx, q.Query)
if err != nil {
return fmt.Errorf("fetch table mode failed: %w", err)
return ok, fmt.Errorf("fetch table mode failed: %w", err)
}

for _, ms := range mss {
if m.Config.MergeResults {
if len(mss) > 1 {
return fmt.Errorf("cannot merge query resulting with more than one rows: %s", q)
return ok, fmt.Errorf("cannot merge query resulting with more than one rows: %s", q)
} else {
for k, v := range ms {
_, ok := merged[k]
Expand All @@ -172,14 +172,14 @@ func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.Rep
}
} else {
// Report immediately for non-merged cases.
m.reportEvent(ms, reporter, q.Query)
ok = m.reportEvent(ms, reporter, q.Query)
}
}
} else {
// Variable format
ms, err := db.FetchVariableMode(ctx, q.Query)
if err != nil {
return fmt.Errorf("fetch variable mode failed: %w", err)
return ok, fmt.Errorf("fetch variable mode failed: %w", err)
}

if m.Config.MergeResults {
Expand All @@ -192,17 +192,17 @@ func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.Rep
}
} else {
// Report immediately for non-merged cases.
m.reportEvent(ms, reporter, q.Query)
ok = m.reportEvent(ms, reporter, q.Query)
}
}
}

if m.Config.MergeResults {
// Report here for merged case.
m.reportEvent(merged, reporter, "")
ok = m.reportEvent(merged, reporter, "")
}

return nil
return ok, nil
}

// Fetch method implements the data gathering and data conversion to the right
Expand All @@ -224,10 +224,13 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
}

if !m.Config.FetchFromAllDatabases {
err = m.fetch(ctx, db, reporter, queries)
reported, err := m.fetch(ctx, db, reporter, queries)
if err != nil {
m.Logger().Warn("error while fetching:", err)
}
if !reported {
m.Logger().Debug("error trying to emit event")
}
return nil
}

Expand Down Expand Up @@ -281,33 +284,38 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
qs[i].Query = dbSelector(m.Config.Driver, dbName) + " " + qs[i].Query
}

err = m.fetch(ctx, db, reporter, qs)
reported, err := m.fetch(ctx, db, reporter, qs)
if err != nil {
m.Logger().Warn("error while fetching:", err)
}
if !reported {
m.Logger().Debug("error trying to emit event")
return nil
}
}

return nil
}

// reportEvent using 'user' mode with keys under `sql.metrics.*` or using Raw data mode (module and metricset key spaces
// provided by the user)
func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) {
func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) bool {
var ok bool
if m.Config.RawData.Enabled {
// New usage.
// Only driver & query field mapped.
// metrics to be mapped by end user.
if len(qry) > 0 {
// set query.
reporter.Event(mb.Event{
ok = reporter.Event(mb.Event{
ModuleFields: mapstr.M{
"metrics": ms, // Individual metric
"driver": m.Config.Driver,
"query": qry,
},
})
} else {
reporter.Event(mb.Event{
ok = reporter.Event(mb.Event{
// Do not set query.
ModuleFields: mapstr.M{
"metrics": ms, // Individual metric
Expand All @@ -318,14 +326,15 @@ func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string)
} else {
// Previous usage. Backward compatibility.
// Supports field mapping.
reporter.Event(mb.Event{
ok = reporter.Event(mb.Event{
ModuleFields: mapstr.M{
"driver": m.Config.Driver,
"query": qry,
"metrics": inferTypeFromMetrics(ms),
},
})
}
return ok
}

// inferTypeFromMetrics to organize the output event into 'numeric', 'strings', 'floats' and 'boolean' values
Expand Down

0 comments on commit 7366a5f

Please sign in to comment.