From c72a56da8a4aab181a8b255a773b775d980e60a8 Mon Sep 17 00:00:00 2001 From: Pavlo Golub Date: Fri, 19 May 2023 17:40:31 +0200 Subject: [PATCH] [-] fix connection usage for replicas and locked sessions, fixes #565 [+] allow endless connection attempts with `--timeout=0` option --- .vscode/launch.json | 5 +++-- internal/pgengine/bootstrap.go | 29 ++++++++++++++++------------- internal/pgengine/bootstrap_test.go | 13 ------------- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 33e0fb2a..b92f595c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,8 +12,9 @@ "program": "${workspaceFolder}", "env": {}, "args": ["postgresql://scheduler@localhost:5432/timetable", - "--clientname=loader", - "--log-level=debug"] + "--clientname=worker001", + "--log-level=debug", + "--timeout=-1"] } ] } \ No newline at end of file diff --git a/internal/pgengine/bootstrap.go b/internal/pgengine/bootstrap.go index 202031ae..b4497bb8 100644 --- a/internal/pgengine/bootstrap.go +++ b/internal/pgengine/bootstrap.go @@ -78,7 +78,11 @@ var sqlNames = []string{"Schema Init", "Cron Functions", "Tables and Views", "JS // New opens connection and creates schema func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHookerIface) (*PgEngine, error) { - var err error + var ( + err error + connctx = ctx + conncancel context.CancelFunc + ) pge := &PgEngine{ l: logger, ConfigDb: nil, @@ -86,8 +90,10 @@ func New(ctx context.Context, cmdOpts config.CmdOptions, logger log.LoggerHooker chainSignalChan: make(chan ChainSignal, 64), } pge.l.WithField("sid", pge.Getsid()).Info("Starting new session... ") - connctx, conncancel := context.WithTimeout(ctx, time.Duration(cmdOpts.Connection.Timeout)*time.Second) - defer conncancel() + if cmdOpts.Connection.Timeout > 0 { // Timeout less than 0 allows endless connection attempts + connctx, conncancel = context.WithTimeout(ctx, time.Duration(cmdOpts.Connection.Timeout)*time.Second) + defer conncancel() + } config := pge.getPgxConnConfig() if err = retry.Do(connctx, backoff, func(ctx context.Context) error { @@ -200,16 +206,13 @@ func (pge *PgEngine) TryLockClientName(ctx context.Context, conn QueryRowIface) return nil } sql = "SELECT timetable.try_lock_client_name($1, $2)" - return retry.Do(ctx, backoff, func(ctx context.Context) error { - var locked bool - if e := conn.QueryRow(ctx, sql, pge.Getsid(), pge.ClientName).Scan(&locked); e != nil { - return e - } else if !locked { - pge.l.Info("Cannot obtain lock for a session") - return retry.RetryableError(errors.New("Cannot obtain lock for a session")) - } - return nil - }) + var locked bool + if e := conn.QueryRow(ctx, sql, pge.Getsid(), pge.ClientName).Scan(&locked); e != nil { + return e + } else if !locked { + return errors.New("Cannot obtain lock for a session") + } + return nil } // ExecuteCustomScripts executes SQL scripts in files diff --git a/internal/pgengine/bootstrap_test.go b/internal/pgengine/bootstrap_test.go index 1e97d3c0..d87f7bab 100644 --- a/internal/pgengine/bootstrap_test.go +++ b/internal/pgengine/bootstrap_test.go @@ -141,17 +141,4 @@ func TestTryLockClientName(t *testing.T) { m := mockpgconn{r} assert.NoError(t, pge.TryLockClientName(context.Background(), m)) }) - - t.Run("retry locking", func(t *testing.T) { - r := &mockpgrow{results: []interface{}{ - 1, //procoid - false, //locked - false, //locked - false, //locked - }} - m := mockpgconn{r} - ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*2) - defer cancel() - assert.ErrorIs(t, pge.TryLockClientName(ctx, m), ctx.Err()) - }) }