Skip to content

Commit

Permalink
add shutdown timeout, use context while running queries;
Browse files Browse the repository at this point in the history
better naming for db interface;
  • Loading branch information
mkabilov committed Aug 12, 2019
1 parent 32f0475 commit ac3e8bd
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
13 changes: 10 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -18,7 +19,8 @@ import (
)

const (
indexHTML = `
shutdownTimeout = 10 * time.Second
indexHTML = `
<html>
<head>
<title>Postgresql Exporter</title>
Expand Down Expand Up @@ -56,7 +58,9 @@ func main() {
if err := cfg.Load(); err != nil {
log.Fatalf("could not load config: %v", err)
}
collector := pgcollector.New()
ctx, cancel := context.WithCancel(context.Background())

collector := pgcollector.New(ctx)
collector.LoadConfig(cfg)

if err := prometheus.Register(collector); err != nil {
Expand Down Expand Up @@ -98,7 +102,10 @@ loop:
log.Printf("received signal: %v", sig)
}
}
if err := srv.Shutdown(context.Background()); err != nil {
cancel()

shutdownCtx, _ := context.WithTimeout(context.Background(), shutdownTimeout)
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("could not shutdown http server: %v", err)
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/adjust/postgresql_exporter/pkg/config"
)

//DbInterface describes Db methods
type DbInterface interface {
//Interface describes Db methods
type Interface interface {
SetStatementTimeout(time.Duration) error
Exec(string) ([]map[string]interface{}, error)
PgVersion() config.PgVersion
Expand All @@ -30,12 +30,13 @@ var ErrQueryTimeout = errors.New("canceled due to statement timeout")

// Db describes database
type Db struct {
ctx context.Context
version config.PgVersion
db *pgx.Conn
}

// New creates new instance of database connection
func New(dbConfig config.DbConfig) (*Db, error) {
func New(ctx context.Context, dbConfig config.DbConfig) (*Db, error) {
var version config.PgVersion

cfg := pgx.ConnConfig{
Expand Down Expand Up @@ -76,17 +77,14 @@ func New(dbConfig config.DbConfig) (*Db, error) {
version = config.NoVersion
}

if err != nil {
return nil, fmt.Errorf("could not open connection: %v", err)
}

if !dbConfig.IsNotPg {
if err := dbConn.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("could not ping db: %v", err)
}
}

return &Db{
ctx: ctx,
db: dbConn,
version: version,
}, nil
Expand All @@ -96,7 +94,7 @@ func New(dbConfig config.DbConfig) (*Db, error) {
func (d *Db) Exec(query string) ([]map[string]interface{}, error) {
values := make([]map[string]interface{}, 0)

rows, err := d.db.Query(query)
rows, err := d.db.QueryEx(d.ctx, query, nil)
if err != nil {
return nil, fmt.Errorf("query error: %v", err)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/pgcollector/pgcollector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pgcollector

import (
"context"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -32,6 +33,7 @@ type PgCollector struct {
config config.Interface
timeOuts uint32
errors uint32
ctx context.Context
}

type workerJob struct {
Expand All @@ -40,8 +42,10 @@ type workerJob struct {
}

// New create new instance of the PostgreSQL metrics collector
func New() *PgCollector {
return &PgCollector{}
func New(ctx context.Context) *PgCollector {
return &PgCollector{
ctx: ctx,
}
}

// LoadConfig loads config
Expand Down Expand Up @@ -84,7 +88,7 @@ func createMetric(job *workerJob, name string, constLabels prometheus.Labels, ra
return nil, nil
}

func (p *PgCollector) worker(conn db.DbInterface, jobs chan *workerJob, res chan<- prometheus.Metric, wg *sync.WaitGroup) {
func (p *PgCollector) worker(conn db.Interface, jobs chan *workerJob, res chan<- prometheus.Metric, wg *sync.WaitGroup) {
defer wg.Done()

jobs:
Expand Down Expand Up @@ -204,16 +208,16 @@ func (p *PgCollector) Collect(metricsCh chan<- prometheus.Metric) {

wg := &sync.WaitGroup{}

dbPool := make(map[string][]db.DbInterface)
dbPool := make(map[string][]db.Interface)
dbJobs := make(map[string]chan *workerJob)

for _, dbName := range p.config.DbList() {
dbConf := p.config.Db(dbName)
workersCnt := dbConf.Workers()

dbPool[dbName] = make([]db.DbInterface, 0)
dbPool[dbName] = make([]db.Interface, 0)
for i := 0; i < workersCnt; i++ {
conn, err := db.New(dbConf)
conn, err := db.New(p.ctx, dbConf)
if err != nil {
log.Printf("could not create db instance %q: %v", dbName, err)
atomic.AddUint32(&p.errors, 1)
Expand Down

0 comments on commit ac3e8bd

Please sign in to comment.