Skip to content

Commit

Permalink
services/horizon: Add new metrics counters for db connection close ev…
Browse files Browse the repository at this point in the history
…ents (stellar#5225)
  • Loading branch information
sreuland committed Mar 5, 2024
1 parent 1379989 commit e21bc43
Show file tree
Hide file tree
Showing 5 changed files with 369 additions and 65 deletions.
5 changes: 5 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## unreleased

### Added
- New `db_error_total` metrics key with labels `ctx_error`, `db_error`, and `db_error_extra` ([5225](https://github.com/stellar/go/pull/5225)).

## 2.28.3

### Fixed
Expand Down
8 changes: 6 additions & 2 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ type Session struct {
// DB is the database connection that queries should be executed against.
DB *sqlx.DB

tx *sqlx.Tx
txOptions *sql.TxOptions
tx *sqlx.Tx
txOptions *sql.TxOptions
errorHandlers []ErrorHandlerFunc
}

// dbErr - the Postgres error
// ctx - the caller's context
type ErrorHandlerFunc func(dbErr error, ctx context.Context)
type SessionInterface interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) error
Begin(ctx context.Context) error
Expand Down
73 changes: 66 additions & 7 deletions support/db/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package db
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/Masterminds/squirrel"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -58,6 +60,7 @@ type SessionWithMetrics struct {
maxLifetimeClosedCounter prometheus.CounterFunc
roundTripProbe *roundTripProbe
roundTripTimeSummary prometheus.Summary
errorCounter *prometheus.CounterVec
}

func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface {
Expand All @@ -66,6 +69,8 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
registry: registry,
}

base.AddErrorHandler(s.handleErrorEvent)

s.queryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -226,6 +231,18 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
)
registry.MustRegister(s.maxLifetimeClosedCounter)

s.errorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "db",
Name: "error_total",
Help: "total number of db related errors, details are captured in labels",
ConstLabels: prometheus.Labels{"subservice": string(sub)},
},
[]string{"ctx_error", "db_error", "db_error_extra"},
)
registry.MustRegister(s.errorCounter)

s.roundTripTimeSummary = prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Expand Down Expand Up @@ -262,15 +279,10 @@ func (s *SessionWithMetrics) Close() error {
s.registry.Unregister(s.maxIdleClosedCounter)
s.registry.Unregister(s.maxIdleTimeClosedCounter)
s.registry.Unregister(s.maxLifetimeClosedCounter)
s.registry.Unregister(s.errorCounter)
return s.SessionInterface.Close()
}

// TODO: Implement these
// func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error {
// func (s *SessionWithMetrics) Begin(ctx context.Context) error {
// func (s *SessionWithMetrics) Commit(ctx context.Context) error
// func (s *SessionWithMetrics) Rollback(ctx context.Context) error

func (s *SessionWithMetrics) TruncateTables(ctx context.Context, tables []string) (err error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
s.queryDurationSummary.With(prometheus.Labels{
Expand Down Expand Up @@ -314,6 +326,7 @@ func (s *SessionWithMetrics) Clone() SessionInterface {
maxIdleClosedCounter: s.maxIdleClosedCounter,
maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter,
maxLifetimeClosedCounter: s.maxLifetimeClosedCounter,
errorCounter: s.errorCounter,
}
}

Expand Down Expand Up @@ -356,6 +369,53 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType {
return UndefinedQueryType
}

// derive the db 'error_total' metric from the err returned by libpq
//
// dbErr - the error returned by any libpq method call
// ctx - the caller's context used on libpb method call
func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) {
if dbErr == nil || s.NoRows(dbErr) {
return
}

ctxError := "n/a"
dbError := "other"
errorExtra := "n/a"
var pqErr *pq.Error

switch {
case errors.As(dbErr, &pqErr):
dbError = string(pqErr.Code)
switch pqErr.Message {
case "canceling statement due to user request":
errorExtra = "user_request"
case "canceling statement due to statement timeout":
errorExtra = "statement_timeout"
}
case strings.Contains(dbErr.Error(), "driver: bad connection"):
dbError = "driver_bad_connection"
case strings.Contains(dbErr.Error(), "sql: transaction has already been committed or rolled back"):
dbError = "tx_already_rollback"
case errors.Is(dbErr, context.Canceled):
dbError = "canceled"
case errors.Is(dbErr, context.DeadlineExceeded):
dbError = "deadline_exceeded"
}

switch {
case errors.Is(ctx.Err(), context.Canceled):
ctxError = "canceled"
case errors.Is(ctx.Err(), context.DeadlineExceeded):
ctxError = "deadline_exceeded"
}

s.errorCounter.With(prometheus.Labels{
"ctx_error": ctxError,
"db_error": dbError,
"db_error_extra": errorExtra,
}).Inc()
}

func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query squirrel.Sqlizer) (err error) {
queryType := string(getQueryType(ctx, query))
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
Expand All @@ -373,7 +433,6 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq
"route": contextRoute(ctx),
}).Inc()
}()

err = s.SessionInterface.Get(ctx, dest, query)
return err
}
Expand Down
81 changes: 57 additions & 24 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package db
import (
"context"
"database/sql"
go_errors "errors"
"fmt"
"reflect"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/stellar/go/support/db/sqlutils"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
Expand All @@ -23,7 +25,7 @@ func (s *Session) Begin(ctx context.Context) error {

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand All @@ -44,7 +46,7 @@ func (s *Session) BeginTx(ctx context.Context, opts *sql.TxOptions) error {

tx, err := s.DB.BeginTxx(ctx, opts)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *Session) Commit() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -146,7 +148,7 @@ func (s *Session) GetRaw(ctx context.Context, dest interface{}, query string, ar
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -215,7 +217,7 @@ func (s *Session) ExecRaw(ctx context.Context, query string, args ...interface{}
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand All @@ -232,29 +234,60 @@ func (s *Session) NoRows(err error) bool {
return err == sql.ErrNoRows
}

// replaceWithKnownError tries to replace Postgres error with package error.
// Returns a new error if the err is known.
func (s *Session) replaceWithKnownError(err error, ctx context.Context) error {
if err == nil {
func (s *Session) AddErrorHandler(handler ErrorHandlerFunc) {
s.errorHandlers = append(s.errorHandlers, handler)
}

// handleError does housekeeping on errors from db.
// dbErr - the libpq client error
// ctx - the calling context
//
// tries to replace dbErr with horizon package error, returns a new error if the err is known.
// invokes any additional error handlers that may have been
// added to the session, passing the caller's context
func (s *Session) handleError(dbErr error, ctx context.Context) error {
if dbErr == nil {
return nil
}

for _, handler := range s.errorHandlers {
handler(dbErr, ctx)
}

var dbErrorCode pq.ErrorCode
var pqErr *pq.Error

// if libpql sends to server, and then any server side error is reported,
// libpq passes back only an pq.ErrorCode from method call
// even if the caller context generates a cancel/deadline error during the server trip,
// libpq will only return an instance of pq.ErrorCode as a non-wrapped error
if go_errors.As(dbErr, &pqErr) {
dbErrorCode = pqErr.Code
}

switch {
case ctx.Err() == context.Canceled:
return ErrCancelled
case ctx.Err() == context.DeadlineExceeded:
// if libpq waits too long to obtain conn from pool, can get ctx timeout before server trip
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to user request"):
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to conflict with recovery"):
case strings.Contains(dbErr.Error(), "pq: canceling statement due to conflict with recovery"):
return ErrConflictWithRecovery
case strings.Contains(err.Error(), "driver: bad connection"):
case strings.Contains(dbErr.Error(), "driver: bad connection"):
return ErrBadConnection
case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"):
return ErrStatementTimeout
case strings.Contains(err.Error(), "transaction has already been committed or rolled back"):
case strings.Contains(dbErr.Error(), "transaction has already been committed or rolled back"):
return ErrAlreadyRolledback
case go_errors.Is(ctx.Err(), context.Canceled):
// when horizon's context is cancelled by it's upstream api client,
// it will propagate to here and libpq will emit a wrapped err that has the cancel err
return ErrCancelled
case go_errors.Is(ctx.Err(), context.DeadlineExceeded):
// when horizon's context times out(it's set to app connection-timeout),
// it will trigger libpq to emit a wrapped err that has the deadline err
return ErrTimeout
case dbErrorCode == "57014":
// https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled
// this code can be generated for multiple cases,
// by libpq sending a signal to server when it experiences a context cancel/deadline
// or it could happen based on just server statement_timeout setting
// since we check the context cancel/deadline err state first, getting here means
// this can only be from a statement timeout
return ErrStatementTimeout
default:
return nil
}
Expand Down Expand Up @@ -284,7 +317,7 @@ func (s *Session) QueryRaw(ctx context.Context, query string, args ...interface{
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand Down Expand Up @@ -318,7 +351,7 @@ func (s *Session) Rollback() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -362,7 +395,7 @@ func (s *Session) SelectRaw(
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down

0 comments on commit e21bc43

Please sign in to comment.