Skip to content

Commit

Permalink
Pass ctx to constructor (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilya-hontarau committed Sep 4, 2023
1 parent 4d7cfc2 commit 704aee6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func main() {
q, err := pgmq.New("postgres://postgres:password@localhost:5432/postgres")
q, err := pgmq.New(context.Background(), "postgres://postgres:password@localhost:5432/postgres")
if err != nil {
panic(err)
}
Expand Down
31 changes: 13 additions & 18 deletions pgmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import (
"context"
"errors"
"fmt"
"log"
"time"

"github.com/craigpastro/retrier"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

const vtDefault = 30

var ErrNoRows = errors.New("pgmq: no rows in result set")
var (
ErrNoRows = errors.New("pgmq: no rows in result set")
ErrPing = errors.New("pgmq: failed to ping db")
)

type Message struct {
MsgID int64
Expand All @@ -38,31 +39,25 @@ type PGMQ struct {
db DB
}

// New establishes a connection to Postgres given by the connString, then
// creates the pgmq extension if it does not already exist.
func New(connString string) (*PGMQ, error) {
// New establishes a connection to Postgres given by the connString, checks connection, if check is failed,
// returns ErrPing, that can be retried, then creates the pgmq extension if it does not already exist.
func New(ctx context.Context, connString string) (*PGMQ, error) {
cfg, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("error parsing connection string: %w", err)
}

pool, err := pgxpool.NewWithConfig(context.Background(), cfg)
pool, err := pgxpool.NewWithConfig(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("error creating pool: %w", err)
}

err = retrier.Do(func() error {
if err = pool.Ping(context.Background()); err != nil {
log.Println("waiting for Postgres")
return err
}
return nil
}, retrier.NewExponentialBackoff())
err = pool.Ping(ctx)
if err != nil {
return nil, fmt.Errorf("error connecting to Postgres: %w", err)
return nil, errors.Join(err, ErrPing)
}

_, err = pool.Exec(context.Background(), "create extension if not exists pgmq cascade")
_, err = pool.Exec(ctx, "create extension if not exists pgmq cascade")
if err != nil {
return nil, fmt.Errorf("error creating pgmq extension: %w", err)
}
Expand All @@ -73,8 +68,8 @@ func New(connString string) (*PGMQ, error) {
}

// MustNew is similar to New, but panics if it encounters an error.
func MustNew(connString string) *PGMQ {
q, err := New(connString)
func MustNew(ctx context.Context, connString string) *PGMQ {
q, err := New(ctx, connString)
if err != nil {
panic(err)
}
Expand Down
10 changes: 8 additions & 2 deletions pgmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"testing"
"time"

"github.com/craigpastro/pgmq-go/mocks"
"github.com/craigpastro/retrier"
"github.com/jackc/pgx/v5/pgconn"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.uber.org/mock/gomock"
"github.com/craigpastro/pgmq-go/mocks"
)

var q *PGMQ
Expand Down Expand Up @@ -53,7 +54,12 @@ func TestMain(m *testing.M) {

connString := fmt.Sprintf("postgres://postgres:password@%s:%s/postgres", host, port.Port())

q = MustNew(connString)
q, err = retrier.DoWithData(func() (*PGMQ, error) {
return New(ctx, connString)
}, retrier.NewExponentialBackoff())
if err != nil {
panic(err)
}

code := m.Run()

Expand Down

0 comments on commit 704aee6

Please sign in to comment.