Skip to content

Commit

Permalink
retries, ping timeout, sql mock handler (#75)
Browse files Browse the repository at this point in the history
retries
  • Loading branch information
scottlepp committed Nov 10, 2022
1 parent 53084cb commit cdeb646
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 29 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -3,3 +3,5 @@
.vscode
.idea/
*.iml

mock-data
57 changes: 53 additions & 4 deletions datasource.go
Expand Up @@ -233,6 +233,20 @@ func (ds *SQLDatasource) handleQuery(ctx context.Context, req backend.DataQuery,
return QueryDB(ctx, db, ds.c.Converters(), fillMode, q)
}

// allow retries on timeouts
if errors.Is(err, context.DeadlineExceeded) {
for i := 0; i < ds.driverSettings.Retries; i++ {
backend.Logger.Warn(fmt.Sprintf("connection timed out. retrying %d times", i))
db, err := ds.c.Connect(dbConn.settings, q.ConnectionArgs)
if err != nil {
continue
}
ds.storeDBConnection(cacheKey, dbConnection{db, dbConn.settings})

return QueryDB(ctx, db, ds.c.Converters(), fillMode, q)
}
}

return nil, err
}

Expand All @@ -243,11 +257,39 @@ func (ds *SQLDatasource) CheckHealth(ctx context.Context, req *backend.CheckHeal
if !ok {
return nil, MissingDBConnection
}
if err := dbConn.db.Ping(); err != nil {

if ds.driverSettings.Retries == 0 {
return ds.check(dbConn)
}

return ds.checkWithRetries(dbConn)
}

func (ds *SQLDatasource) DriverSettings() DriverSettings {
return ds.driverSettings
}

func (ds *SQLDatasource) checkWithRetries(conn dbConnection) (*backend.CheckHealthResult, error) {
var result *backend.CheckHealthResult
var err error

for i := 0; i < ds.driverSettings.Retries; i++ {
result, err = ds.check(conn)
if err == nil {
return result, err
}
}

// TODO: failed health checks don't return an error
return result, nil
}

func (ds *SQLDatasource) check(conn dbConnection) (*backend.CheckHealthResult, error) {
if err := ds.ping(conn); err != nil {
return &backend.CheckHealthResult{
Status: backend.HealthStatusError,
Message: err.Error(),
}, nil
}, err
}

return &backend.CheckHealthResult{
Expand All @@ -256,6 +298,13 @@ func (ds *SQLDatasource) CheckHealth(ctx context.Context, req *backend.CheckHeal
}, nil
}

func (ds *SQLDatasource) DriverSettings() DriverSettings {
return ds.driverSettings
func (ds *SQLDatasource) ping(conn dbConnection) error {
if ds.driverSettings.Timeout == 0 {
return conn.db.Ping()
}

ctx, cancel := context.WithTimeout(context.Background(), ds.driverSettings.Timeout)
defer cancel()

return conn.db.PingContext(ctx)
}
62 changes: 61 additions & 1 deletion datasource_test.go
@@ -1,12 +1,17 @@
package sqlds

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/sqlds/v2/mock"
"github.com/stretchr/testify/assert"
)

type fakeDriver struct {
Expand All @@ -15,10 +20,12 @@ type fakeDriver struct {
Driver
}

func (d *fakeDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
func (d fakeDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
return d.db, nil
}

// func (d fakeDriver) Settings(backend.DataSourceInstanceSettings) DriverSettings

func Test_getDBConnectionFromQuery(t *testing.T) {
db := &sql.DB{}
db2 := &sql.DB{}
Expand Down Expand Up @@ -113,3 +120,56 @@ func Test_Dispose(t *testing.T) {
}
})
}

func Test_retries(t *testing.T) {
dsUID := "timeout"
settings := backend.DataSourceInstanceSettings{UID: dsUID}

handler := testSqlHandler{}
mockDriver := "sqlmock"
mock.RegisterDriver(mockDriver, handler)
db, err := sql.Open(mockDriver, "")
if err != nil {
t.Errorf("failed to connect to mock driver: %v", err)
}
timeoutDriver := fakeDriver{
db: db,
}
retries := 5
max := time.Duration(testTimeout) * time.Second
driverSettings := DriverSettings{Retries: retries, Timeout: max}
ds := &SQLDatasource{c: timeoutDriver, driverSettings: driverSettings}

key := defaultKey(dsUID)
// Add the mandatory default db
ds.storeDBConnection(key, dbConnection{db, settings})
ctx := context.Background()
req := &backend.CheckHealthRequest{
PluginContext: backend.PluginContext{
DataSourceInstanceSettings: &settings,
},
}
result, err := ds.CheckHealth(ctx, req)

assert.Nil(t, err)
assert.Equal(t, retries, testCounter)
expected := context.DeadlineExceeded.Error()
assert.Equal(t, expected, result.Message)
}

var testCounter = 0
var testTimeout = 1

type testSqlHandler struct {
mock.DBHandler
}

func (s testSqlHandler) Ping(ctx context.Context) error {
testCounter++ // track the retries for the test assertion
time.Sleep(time.Duration(testTimeout + 1)) // simulate a connection delay
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return ctx.Err()
}
}
1 change: 1 addition & 0 deletions driver.go
Expand Up @@ -14,6 +14,7 @@ import (
type DriverSettings struct {
Timeout time.Duration
FillMode *data.FillMissing
Retries int
}

// Driver is a simple interface that defines how to connect to a backend SQL datasource
Expand Down
2 changes: 1 addition & 1 deletion mock/csv_data.go → mock/csv/csv_data.go
@@ -1,4 +1,4 @@
package mock
package csv

import (
"errors"
Expand Down
12 changes: 6 additions & 6 deletions mock/csv_mock.go → mock/csv/csv_mock.go
@@ -1,4 +1,4 @@
package mock
package csv

import (
"context"
Expand All @@ -13,7 +13,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
ds "github.com/grafana/sqlds/v2"
"github.com/grafana/sqlds/v2"
_ "github.com/mithrandie/csvq-driver"
)

Expand All @@ -22,8 +22,8 @@ type SQLCSVMock struct {
folder string
}

func (h *SQLCSVMock) Settings(config backend.DataSourceInstanceSettings) ds.DriverSettings {
return ds.DriverSettings{
func (h *SQLCSVMock) Settings(config backend.DataSourceInstanceSettings) sqlds.DriverSettings {
return sqlds.DriverSettings{
FillMode: &data.FillMissing{
Mode: data.FillModeNull,
},
Expand Down Expand Up @@ -91,6 +91,6 @@ func (h *SQLCSVMock) Converters() []sqlutil.Converter {
}

// Macros returns list of macro functions convert the macros of raw query
func (h *SQLCSVMock) Macros() ds.Macros {
return ds.Macros{}
func (h *SQLCSVMock) Macros() sqlds.Macros {
return sqlds.Macros{}
}
29 changes: 23 additions & 6 deletions mock/mock_driver.go
@@ -1,35 +1,52 @@
package mock

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"errors"
"sync"

"github.com/grafana/grafana-plugin-sdk-go/backend"
)

var pool *mockDriver

func init() {
func RegisterDriver(name string, handler DBHandler) *mockDriver {
pool = &mockDriver{
conns: make(map[string]*sqlmock),
conns: make(map[string]*sqlmock),
handler: handler,
}
sql.Register("sqlmock", pool)
sql.Register(name, pool)
return pool
}

type DBHandler interface {
Ping(ctx context.Context) error
Query(args []driver.Value) (driver.Rows, error)
Columns() []string
Next(dest []driver.Value) error
}

type mockDriver struct {
sync.Mutex
counter int
conns map[string]*sqlmock
handler DBHandler
}

func (d *mockDriver) Open(dsn string) (driver.Conn, error) {
if len(d.conns) == 0 {
mock := &sqlmock{
drv: d,
sleep: 10,
drv: d,
}
d.conns = map[string]*sqlmock{
dsn: mock,
}
}
return d.conns[dsn], nil
}

func (d *mockDriver) Connect(backend.DataSourceInstanceSettings, json.RawMessage) (db *sql.DB, err error) {
return nil, errors.New("context deadline exceeded")
}
38 changes: 27 additions & 11 deletions mock/sqlmock.go
Expand Up @@ -3,12 +3,10 @@ package mock
import (
"context"
"database/sql/driver"
"time"
)

type sqlmock struct {
drv *mockDriver
sleep int
drv *mockDriver
}

// Begin meets http://golang.org/pkg/database/sql/driver/#Conn interface
Expand Down Expand Up @@ -37,14 +35,7 @@ func (c *sqlmock) Close() error {
}

func (c *sqlmock) Ping(ctx context.Context) error {
// so we can test timeout retries
if c.sleep > 0 {
v := c.sleep
next := float64(v) * .5
c.sleep = int(next)
time.Sleep(time.Duration(v) * time.Second)
}
return nil
return c.drv.handler.Ping(ctx)
}

// statement
Expand All @@ -58,6 +49,9 @@ func (stmt *statement) Exec(args []driver.Value) (driver.Result, error) {
}

func (stmt *statement) Query(args []driver.Value) (driver.Rows, error) {
if stmt.conn.drv.handler != nil {
stmt.conn.drv.handler.Query(args)
}
return nil, nil
}

Expand All @@ -68,3 +62,25 @@ func (stmt *statement) Close() error {
func (stmt *statement) NumInput() int {
return -1
}

type rows struct {
conn *sqlmock
}

func (r rows) Columns() []string {
if r.conn.drv.handler != nil {
return r.conn.drv.handler.Columns()
}
return []string{}
}

func (r rows) Close() error {
return nil
}

func (r rows) Next(dest []driver.Value) error {
if r.conn.drv.handler != nil {
return r.conn.drv.handler.Next(dest)
}
return nil
}

0 comments on commit cdeb646

Please sign in to comment.