Skip to content

Commit

Permalink
[SQL] Add option to query same command(s) for all dbs
Browse files Browse the repository at this point in the history
  • Loading branch information
shmsr committed Jul 12, 2023
1 parent 2dc3f67 commit 56df53a
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 33 deletions.
13 changes: 7 additions & 6 deletions metricbeat/helper/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,22 @@ type sqlRow interface {

// NewDBClient gets a client ready to query the database
func NewDBClient(driver, uri string, l *logp.Logger) (*DbClient, error) {
dbx, err := sql.Open(switchDriverName(driver), uri)
dbx, err := sql.Open(SwitchDriverName(driver), uri)
if err != nil {
return nil, fmt.Errorf("opening connection: %w", err)
}
err = dbx.Ping()
if err != nil {
if closeErr := dbx.Close(); closeErr != nil {
return nil, fmt.Errorf("failed to close with %s, after connection test failed: %w", closeErr, err)
return nil, fmt.Errorf("failed to close with %s, after connection test failed: %w", closeErr.Error(), err)

Check failure on line 54 in metricbeat/helper/sql/sql.go

View workflow job for this annotation

GitHub Actions / lint (linux)

non-wrapping format verb for fmt.Errorf. Use `%w` to format errors (errorlint)
}
return nil, fmt.Errorf("testing connection: %w", err)
}

return &DbClient{DB: dbx, logger: l}, nil
}

// fetchTableMode scan the rows and publishes the event for querys that return the response in a table format.
// FetchTableMode scan the rows and publishes the event for querys that return the response in a table format.
func (d *DbClient) FetchTableMode(ctx context.Context, q string) ([]mapstr.M, error) {
rows, err := d.QueryContext(ctx, q)
if err != nil {
Expand Down Expand Up @@ -144,7 +144,8 @@ func (d *DbClient) fetchVariableMode(rows sqlRow) (mapstr.M, error) {
r := mapstr.M{}

for key, value := range data {
value := getValue(&value)
value := value
value = getValue(&value)
r.Put(key, value)
}

Expand Down Expand Up @@ -187,9 +188,9 @@ func getValue(pval *interface{}) interface{} {
}
}

// switchDriverName switches between driver name and a pretty name for a driver. For example, 'oracle' driver is called
// SwitchDriverName switches between driver name and a pretty name for a driver. For example, 'oracle' driver is called
// 'godror' so this detail implementation must be hidden to the user, that should only choose and see 'oracle' as driver
func switchDriverName(d string) string {
func SwitchDriverName(d string) string {
switch d {
case "oracle":
return "godror"
Expand Down
154 changes: 127 additions & 27 deletions x-pack/metricbeat/module/sql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package query

import (
"context"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -44,12 +45,13 @@ type config struct {

Driver string `config:"driver" validate:"nonzero,required"`

// Support either the previous query / or the new list of queries.
ResponseFormat string `config:"sql_response_format"`
Query string `config:"sql_query" `
// Support either the query or list of queries.
ResponseFormat string `config:"sql_response_format"`
Query string `config:"sql_query"`
Queries []query `config:"sql_queries"`
MergeResults bool `config:"merge_results"`

Queries []query `config:"sql_queries" `
MergeResults bool `config:"merge_results"`
FetchFromAllDB bool `config:"fetch_from_all_dbs"`
}

// MetricSet holds any configuration or state information. It must implement
Expand Down Expand Up @@ -104,25 +106,39 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return b, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
// It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response
// format of the query.
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
db, err := sql.NewDBClient(m.Config.Driver, m.HostData().URI, m.Logger())
if err != nil {
return fmt.Errorf("could not open connection: %w", err)
// queryDBNames returns the query to list databases present in a server
// as per the driver name. If the given driver is not supported, queryDBNames
// returns an empty query.
func queryDBNames(driver string) string {
switch sql.SwitchDriverName(driver) {
case "mssql":
return "SELECT [name] FROM sys.databases WITH (NOLOCK) WHERE state = 0 AND HAS_DBACCESS([name]) = 1"
// case "mysql":
// return "SHOW DATABASES"
// case "godror":
// // NOTE: Requires necessary priviledges to access DBA_USERS
// // Ref: https://stackoverflow.com/a/3005623/5821408
// return "SELECT * FROM DBA_USERS"
// case "postgres":
// return "SELECT datname FROM pg_database"
}
defer db.Close()

queries := m.Config.Queries
if len(queries) == 0 {
one_query := query{Query: m.Config.Query, ResponseFormat: m.Config.ResponseFormat}
queries = append(queries, one_query)
return ""
}

// dbSelector returns the statement to select a named database to run the
// subsequent statements. If the given driver is not supported, dbSelector
// returns an empty statement.
func dbSelector(driver, dbName string) string {
switch sql.SwitchDriverName(driver) {
case "mssql":
return fmt.Sprintf("USE [%s];", dbName)
}
return ""
}

merged := mapstr.M{}
func (m *MetricSet) fetch(ctx context.Context, db *sql.DbClient, reporter mb.ReporterV2, queries []query) error {
merged := make(mapstr.M, 0)

for _, q := range queries {
if q.ResponseFormat == tableResponseFormat {
Expand All @@ -135,12 +151,12 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
for _, ms := range mss {
if m.Config.MergeResults {
if len(mss) > 1 {
return fmt.Errorf("can not merge query resulting with more than one rows: %s", q)
return fmt.Errorf("cannot merge query resulting with more than one rows: %s", q)
} else {
for k, v := range ms {
_, ok := merged[k]
if ok {
m.Logger().Warn("overwriting duplicate metrics: ", k)
m.Logger().Warn("overwriting duplicate metrics:", k)
}
merged[k] = v
}
Expand All @@ -161,7 +177,7 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
for k, v := range ms {
_, ok := merged[k]
if ok {
m.Logger().Warn("overwriting duplicate metrics: ", k)
m.Logger().Warn("overwriting duplicate metrics:", k)
}
merged[k] = v
}
Expand All @@ -171,6 +187,7 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
}
}
}

if m.Config.MergeResults {
// Report here for merged case.
m.reportEvent(merged, reporter, "")
Expand All @@ -179,11 +196,95 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
return nil
}

// Fetch method implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
// It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response
// format of the query.
func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
db, err := sql.NewDBClient(m.Config.Driver, m.HostData().URI, m.Logger())
if err != nil {
return fmt.Errorf("cannot open connection: %w", err)
}
defer db.Close()

queries := m.Config.Queries
if len(queries) == 0 {
one_query := query{Query: m.Config.Query, ResponseFormat: m.Config.ResponseFormat}
queries = append(queries, one_query)
}

if !m.Config.FetchFromAllDB {
err = m.fetch(ctx, db, reporter, queries)
if err != nil {
m.Logger().Warn("error while fetching:", err)
}
return nil
}

// NOTE: Only mssql driver is supported for now because:
//
// * Difference in queries to fetch the name of the databases
// * The statement to select a named database (for subsequent statements
// to be executed) may not be generic i.e, USE statement (e.g., USE <db_name>)
// works for MSSQL but not Oracle.
//
// TODO: Add the feature for other drivers when need arises.
validQuery := queryDBNames(m.Config.Driver)
if validQuery == "" {
return fmt.Errorf("fetch from all databases feature is not supported for driver: %s", m.Config.Driver)
}

// Discover all databases in the server and execute given queries on each
// of the databases.
dbNames, err := db.FetchTableMode(ctx, queryDBNames(m.Config.Driver))
if err != nil {
return fmt.Errorf("cannot fetch database names: %w", err)
}

if len(dbNames) == 0 {
return errors.New("no database names found")
}

qs := make([]query, 0, len(queries))

for i := range dbNames {
// Create a copy of the queries as query would be modified on every
// iteration.
qs = qs[:0] // empty slice
qs = append(qs, queries...) // copy queries

val, err := dbNames[i].GetValue("name")
if err != nil {
m.Logger().Warn("error with database name:", err)
continue
}
dbName, ok := val.(string)
if !ok {
m.Logger().Warn("error with database name's type")
continue
}

// Prefix dbSelector to the query based on the driver
// provided.
// Example: USE <dbName>; @command (or @query)
for i := range qs {
qs[i].Query = dbSelector(m.Config.Driver, dbName) + " " + qs[i].Query
}

err = m.fetch(ctx, db, reporter, qs)
if err != nil {
m.Logger().Warn("error while fetching:", err)
}
}

return nil
}

// reportEvent using 'user' mode with keys under `sql.metrics.*` or using Raw data mode (module and metricset key spaces
// provided by the user)
func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) {
if m.Config.RawData.Enabled {

// New usage.
// Only driver & query field mapped.
// metrics to be mapped by end user.
Expand All @@ -204,10 +305,9 @@ func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string)
"driver": m.Config.Driver,
},
})

}
} else {
// Previous usage. Backword compartibility.
// Previous usage. Backward compatibility.
// Supports field mapping.
reporter.Event(mb.Event{
ModuleFields: mapstr.M{
Expand Down Expand Up @@ -237,7 +337,7 @@ func inferTypeFromMetrics(ms mapstr.M) mapstr.M {
case bool:
boolMetrics[k] = v
case nil:
//Ignore because a nil has no data type and thus cannot be indexed
// Ignore because a nil has no data type and thus cannot be indexed
default:
stringMetrics[k] = v
}
Expand Down

0 comments on commit 56df53a

Please sign in to comment.