Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[!] add suport for jackc/pgx/v5 #488

Merged
merged 12 commits into from
Sep 21, 2022
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

- name: Test
run: go test -v -p 1 -parallel 1 -failfast ./...
Expand Down Expand Up @@ -80,7 +80,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

- name: Test
run: go test -v -p 1 -parallel 1 -failfast ./...
Expand Down Expand Up @@ -113,7 +113,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

- name: Get dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

# despite the fact docker will build binary internally
# we want to stop workflow in case of any error before pushing to registry
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Golang
uses: actions/setup-go@v3
with:
go-version: '1.18'
go-version: '1.19'

- name: Check out code into the Go module directory
uses: actions/checkout@v3
Expand Down
4 changes: 2 additions & 2 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{
"label": "Unit Test",
"type": "shell",
"command": "gotest -failfast -timeout=300s -parallel=1 ./${relativeFileDirname}/... -coverprofile='coverage.out' && go tool cover -func='coverage.out'",
"command": "go test -failfast -timeout=300s -parallel=1 ./${relativeFileDirname}/... -coverprofile='coverage.out' -json | tparse -all",
"problemMatcher": [
"$go"
],
Expand All @@ -23,7 +23,7 @@
{
"label": "Coverage Report",
"type": "shell",
"command": "go tool cover -html='coverage.out'",
"command": "go tool cover -func='coverage.out' && go tool cover -html='coverage.out'",
"problemMatcher": [
"$go"
],
Expand Down
20 changes: 7 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
module github.com/cybertec-postgresql/pg_timetable

go 1.18
go 1.19

require (
github.com/cavaliercoder/grab v2.0.0+incompatible
github.com/georgysavva/scany v1.2.0
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgtype v1.12.0
github.com/jackc/pgx/v4 v4.17.2
github.com/jackc/pgx/v5 v5.0.0
github.com/jessevdk/go-flags v1.5.0
github.com/ory/mail/v3 v3.0.1-0.20210418065910-7f033ddea8dc
github.com/pashagolub/pgxmock v1.8.0
github.com/pashagolub/pgxmock/v2 v2.0.0
github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5
github.com/sethvargo/go-retry v0.2.3
github.com/sirupsen/logrus v1.9.0
Expand All @@ -22,25 +19,22 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jackc/puddle/v2 v2.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/crypto v0.0.0-20220919173607-35f4265a4bc0 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
181 changes: 13 additions & 168 deletions go.sum

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions internal/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"

"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5/tracelog"
"github.com/rifflock/lfshook"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -60,7 +60,7 @@ func NewPgxLogger(l LoggerIface) *PgxLogger {
}

// Log transforms logging calls from pgx to logrus
func (pgxlogger *PgxLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
func (pgxlogger *PgxLogger) Log(ctx context.Context, level tracelog.LogLevel, msg string, data map[string]any) {
logger := GetLogger(ctx)
if logger == FallbackLogger { //switch from standard to specified
logger = pgxlogger.l
Expand All @@ -69,13 +69,13 @@ func (pgxlogger *PgxLogger) Log(ctx context.Context, level pgx.LogLevel, msg str
logger = logger.WithFields(data)
}
switch level {
case pgx.LogLevelTrace:
case tracelog.LogLevelTrace:
logger.WithField("PGX_LOG_LEVEL", level).Debug(msg)
case pgx.LogLevelDebug, pgx.LogLevelInfo: //pgx is way too chatty on INFO level
case tracelog.LogLevelDebug, tracelog.LogLevelInfo: //pgx is way too chatty on INFO level
logger.Debug(msg)
case pgx.LogLevelWarn:
case tracelog.LogLevelWarn:
logger.Warn(msg)
case pgx.LogLevelError:
case tracelog.LogLevelError:
logger.Error(msg)
default:
logger.WithField("INVALID_PGX_LOG_LEVEL", level).Error(msg)
Expand Down
6 changes: 3 additions & 3 deletions internal/log/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v5/tracelog"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
Expand All @@ -33,8 +33,8 @@ func TestFileLogger(t *testing.T) {

func TestPgxLog(t *testing.T) {
pgxl := log.NewPgxLogger(log.Init(config.LoggingOpts{LogLevel: "trace"}))
var level pgx.LogLevel
for level = pgx.LogLevelNone; level <= pgx.LogLevelTrace; level++ {
var level tracelog.LogLevel
for level = tracelog.LogLevelNone; level <= tracelog.LogLevelTrace; level++ {
pgxl.Log(context.Background(), level, "foo", map[string]interface{}{"func": "TestPgxLog"})
}
}
4 changes: 2 additions & 2 deletions internal/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"errors"
"fmt"

"github.com/jackc/pgconn"
pgx "github.com/jackc/pgx/v4"
pgx "github.com/jackc/pgx/v5"
pgconn "github.com/jackc/pgx/v5/pgconn"
)

// PgxIface is interface for database connection or transaction
Expand Down
4 changes: 2 additions & 2 deletions internal/migrator/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/cybertec-postgresql/pg_timetable/internal/migrator"
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
pgx "github.com/jackc/pgx/v4"
"github.com/pashagolub/pgxmock"
pgx "github.com/jackc/pgx/v5"
"github.com/pashagolub/pgxmock/v2"
"github.com/stretchr/testify/assert"
)

Expand Down
41 changes: 28 additions & 13 deletions internal/pgengine/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"strings"

"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgx/v5"
)

// InvalidOid specifies value for non-existent objects
Expand Down Expand Up @@ -34,7 +34,7 @@ chain_id, task_id, command, kind, last_run, finished, returncode, pid, output, c
VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $6, $7, NULLIF($8, ''), $9, $10)`,
task.ChainID, task.TaskID, task.Script, task.Kind,
fmt.Sprintf("%f seconds", float64(task.Duration)/1000000),
retCode, pge.Getpid(), strings.TrimSpace(output), pge.ClientName, task.Txid)
retCode, pge.Getsid(), strings.TrimSpace(output), pge.ClientName, task.Txid)
if err != nil {
pge.l.WithError(err).Error("Failed to log chain element execution status")
}
Expand Down Expand Up @@ -65,36 +65,51 @@ func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) {
}

// Select live chains with proper client_name value
const sqlSelectLiveChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances
const sqlSelectLiveChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(max_instances, 16) as max_instances, COALESCE(timeout, 0) as timeout
FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL)`

// SelectRebootChains returns a list of chains should be executed after reboot
func (pge *PgEngine) SelectRebootChains(ctx context.Context, dest interface{}) error {
func (pge *PgEngine) SelectRebootChains(ctx context.Context, dest *[]Chain) error {
const sqlSelectRebootChains = sqlSelectLiveChains + ` AND run_at = '@reboot'`
return pgxscan.Select(ctx, pge.ConfigDb, dest, sqlSelectRebootChains, pge.ClientName)
rows, err := pge.ConfigDb.Query(ctx, sqlSelectRebootChains, pge.ClientName)
if err != nil {
return err
}
*dest, err = pgx.CollectRows(rows, pgx.RowToStructByPos[Chain])
return err
}

// SelectChains returns a list of chains should be executed at the current moment
func (pge *PgEngine) SelectChains(ctx context.Context, dest interface{}) error {
func (pge *PgEngine) SelectChains(ctx context.Context, dest *[]Chain) error {
const sqlSelectChains = sqlSelectLiveChains + ` AND NOT COALESCE(starts_with(run_at, '@'), FALSE) AND timetable.is_cron_in_time(run_at, now())`
return pgxscan.Select(ctx, pge.ConfigDb, dest, sqlSelectChains, pge.ClientName)
rows, err := pge.ConfigDb.Query(ctx, sqlSelectChains, pge.ClientName)
if err != nil {
return err
}
*dest, err = pgx.CollectRows(rows, pgx.RowToStructByPos[Chain])
return err
}

// SelectIntervalChains returns list of interval chains to be executed
func (pge *PgEngine) SelectIntervalChains(ctx context.Context, dest interface{}) error {
const sqlSelectIntervalChains = `SELECT
chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances,
func (pge *PgEngine) SelectIntervalChains(ctx context.Context, dest *[]IntervalChain) error {
const sqlSelectIntervalChains = `SELECT chain_id, chain_name, self_destruct, exclusive_execution,
COALESCE(max_instances, 16), COALESCE(timeout, 0),
EXTRACT(EPOCH FROM (substr(run_at, 7) :: interval)) :: int4 as interval_seconds,
starts_with(run_at, '@after') as repeat_after
FROM timetable.chain WHERE live AND (client_name = $1 or client_name IS NULL) AND substr(run_at, 1, 6) IN ('@every', '@after')`
return pgxscan.Select(ctx, pge.ConfigDb, dest, sqlSelectIntervalChains, pge.ClientName)
rows, err := pge.ConfigDb.Query(ctx, sqlSelectIntervalChains, pge.ClientName)
if err != nil {
return err
}
*dest, err = pgx.CollectRows(rows, pgx.RowToStructByPos[IntervalChain])
return err
}

// SelectChain returns the chain with the specified ID
func (pge *PgEngine) SelectChain(ctx context.Context, dest interface{}, chainID int) error {
// we accept not only live chains here because we want to run them in debug mode
const sqlSelectSingleChain = `SELECT chain_id, chain_name, self_destruct, exclusive_execution, COALESCE(timeout, 0) as timeout, COALESCE(max_instances, 16) as max_instances
FROM timetable.chain WHERE (client_name = $1 OR client_name IS NULL) AND chain_id = $2`
return pgxscan.Get(ctx, pge.ConfigDb, dest, sqlSelectSingleChain, pge.ClientName, chainID)
return pge.ConfigDb.QueryRow(ctx, sqlSelectSingleChain, pge.ClientName, chainID).Scan(dest)
}
10 changes: 6 additions & 4 deletions internal/pgengine/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"testing"

"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
"github.com/pashagolub/pgxmock"
"github.com/pashagolub/pgxmock/v2"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -61,18 +61,20 @@ func TestRemoveChainRunStatus(t *testing.T) {
}

func TestSelectChains(t *testing.T) {
var c *[]pgengine.Chain
var ic *[]pgengine.IntervalChain
initmockdb(t)
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
defer mockPool.Close()

mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
assert.Error(t, pge.SelectChains(context.Background(), struct{}{}))
assert.Error(t, pge.SelectChains(context.Background(), c))

mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
assert.Error(t, pge.SelectRebootChains(context.Background(), struct{}{}))
assert.Error(t, pge.SelectRebootChains(context.Background(), c))

mockPool.ExpectExec("SELECT.+chain_id").WillReturnError(errors.New("error"))
assert.Error(t, pge.SelectIntervalChains(context.Background(), struct{}{}))
assert.Error(t, pge.SelectIntervalChains(context.Background(), ic))
}

func TestSelectChain(t *testing.T) {
Expand Down