Skip to content

Commit

Permalink
Add SendBatch, DeleteBatch, ArchiveBatch (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
Craig Pastro committed Aug 29, 2023
1 parent 9bb10e1 commit e26f210
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 31 deletions.
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ linters:
- errname
- gofmt
- goimports
- sqlclosecheck
- stylecheck
- unconvert
- unparam
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
[![codecov](https://codecov.io/github/craigpastro/pgmq-go/branch/main/graph/badge.svg?token=00AJODX77Z)](https://codecov.io/github/craigpastro/pgmq-go)

A Go (Golang) client for
[Postgres Message Queue](https://github.com/tembo-io/pgmq) (PGMQ). Based on the
[Rust client](https://github.com/tembo-io/pgmq/tree/main/core).
[Postgres Message Queue](https://github.com/tembo-io/pgmq) (PGMQ). Based loosely
on the [Rust client](https://github.com/tembo-io/pgmq/tree/main/core).

## Usage

Expand Down
71 changes: 55 additions & 16 deletions pgmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,28 @@ func (p *PGMQ) Send(ctx context.Context, queue string, msg map[string]any) (int6
return msgID, nil
}

// SendBatch sends a batch of messages to a queue. The message ids, unique to the
// queue, are returned.
func (p *PGMQ) SendBatch(ctx context.Context, queue string, msgs []map[string]any) ([]int64, error) {
rows, err := p.pool.Query(ctx, "select * from pgmq_send_batch($1, $2::jsonb[])", queue, msgs)
if err != nil {
return nil, wrapPostgresError(err)
}
defer rows.Close()

var msgIDs []int64
for rows.Next() {
var msgID int64
err = rows.Scan(&msgID)
if err != nil {
return nil, wrapPostgresError(err)
}
msgIDs = append(msgIDs, msgID)
}

return msgIDs, nil
}

// Read a single message from the queue. If the queue is empty or all
// messages are invisible, an ErrNoRows errors is returned. If a message is
// returned, it is made invisible for the duration of the visibility timeout
Expand Down Expand Up @@ -190,41 +212,58 @@ func (p *PGMQ) Pop(ctx context.Context, queue string) (*Message, error) {
return &msg, nil
}

// Archive moves a message from the queue table to archive table by its id.
// View messages on the archive table with sql:
// Archive moves a message from the queue table to the archive table by its
// id. View messages on the archive table with sql:
//
// select * from pgmq_<queue_name>_archive;
func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (int64, error) {
func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (bool, error) {
var archived bool
err := p.pool.QueryRow(ctx, "select pgmq_archive($1, $2)", queue, msgID).Scan(&archived)
err := p.pool.QueryRow(ctx, "select pgmq_archive($1, $2::bigint)", queue, msgID).Scan(&archived)
if err != nil {
return 0, wrapPostgresError(err)
return false, wrapPostgresError(err)
}

var rowsAffected int64 = 0
if archived {
rowsAffected = 1
return archived, nil
}

// ArchiveBatch moves a batch of messages from the queue table to the archive
// table by their ids. View messages on the archive table with sql:
//
// select * from pgmq_<queue_name>_archive;
func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) (bool, error) {
var archived bool
err := p.pool.QueryRow(ctx, "select pgmq_archive($1, $2::bigint[])", queue, msgIDs).Scan(&archived)
if err != nil {
return false, wrapPostgresError(err)
}

return rowsAffected, nil
return archived, nil
}

// Delete deletes a message from the queue by its id. This is a permanent
// delete and cannot be undone. If you want to retain a log of the message,
// use the Archive method.
func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (int64, error) {
func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (bool, error) {
var deleted bool
err := p.pool.QueryRow(ctx, "select pgmq_delete($1, $2)", queue, msgID).Scan(&deleted)
err := p.pool.QueryRow(ctx, "select pgmq_delete($1, $2::bigint)", queue, msgID).Scan(&deleted)
if err != nil {
return 0, wrapPostgresError(err)
return false, wrapPostgresError(err)
}

var rowsAffected int64 = 0
if deleted {
rowsAffected = 1
return deleted, nil
}

// DeleteBatch deletes a batch of messages from the queue by their ids. This
// is a permanent delete and cannot be undone. If you want to retain a log of
// the messages, use the ArchiveBatch method.
func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) (bool, error) {
var deleted bool
err := p.pool.QueryRow(ctx, "select pgmq_delete($1, $2::bigint[])", queue, msgIDs).Scan(&deleted)
if err != nil {
return false, wrapPostgresError(err)
}

return rowsAffected, nil
return deleted, nil
}

func wrapPostgresError(err error) error {
Expand Down
101 changes: 88 additions & 13 deletions pgmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ func TestSend(t *testing.T) {
require.EqualValues(t, 2, id)
}

func TestSendBatch(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)
require.Equal(t, []int64{1, 2}, ids)
}

func TestRead(t *testing.T) {
ctx := context.Background()
queue := t.Name()
Expand Down Expand Up @@ -135,10 +147,7 @@ func TestReadBatch(t *testing.T) {
err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

_, err = q.Send(ctx, queue, testMsg1)
require.NoError(t, err)

_, err = q.Send(ctx, queue, testMsg2)
_, err = q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

time.Sleep(time.Second)
Expand Down Expand Up @@ -205,11 +214,11 @@ func TestArchive(t *testing.T) {
id, err := q.Send(ctx, queue, testMsg1)
require.NoError(t, err)

rowsAffected, err := q.Archive(ctx, queue, id)
archived, err := q.Archive(ctx, queue, id)
require.NoError(t, err)
require.EqualValues(t, 1, rowsAffected)
require.True(t, archived)

// Let's just check that something landing in the archive table.
// Let's just check that something landed in the archive table.
stmt := fmt.Sprintf("select * from pgmq_%s_archive", queue)
tag, err := q.pool.Exec(ctx, stmt)
require.NoError(t, err)
Expand All @@ -226,9 +235,45 @@ func TestArchiveNotExist(t *testing.T) {
err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

rowsAffected, err := q.Archive(ctx, queue, 100)
archived, err := q.Archive(ctx, queue, 100)
require.NoError(t, err)
require.False(t, archived)
}

func TestArchiveBatch(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

archived, err := q.ArchiveBatch(ctx, queue, ids)
require.NoError(t, err)
require.True(t, archived)

// Let's just check that something landed in the archive table.
stmt := fmt.Sprintf("select * from pgmq_%s_archive", queue)
tag, err := q.pool.Exec(ctx, stmt)
require.NoError(t, err)
require.EqualValues(t, 2, tag.RowsAffected())

_, err = q.Read(ctx, queue, 0)
require.ErrorIs(t, err, ErrNoRows)
}

func TestArchiveBatchNotExists(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

archived, err := q.ArchiveBatch(ctx, queue, []int64{100})
require.NoError(t, err)
require.EqualValues(t, 0, rowsAffected)
require.True(t, archived)
}

func TestDelete(t *testing.T) {
Expand All @@ -241,9 +286,9 @@ func TestDelete(t *testing.T) {
id, err := q.Send(ctx, queue, testMsg1)
require.NoError(t, err)

rowsAffected, err := q.Delete(ctx, queue, id)
deleted, err := q.Delete(ctx, queue, id)
require.NoError(t, err)
require.EqualValues(t, 1, rowsAffected)
require.True(t, deleted)

_, err = q.Read(ctx, queue, 0)
require.ErrorIs(t, err, ErrNoRows)
Expand All @@ -256,7 +301,37 @@ func TestDeleteNotExist(t *testing.T) {
err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

rowsAffected, err := q.Delete(ctx, queue, 100)
deleted, err := q.Delete(ctx, queue, 100)
require.NoError(t, err)
require.False(t, deleted)
}

func TestDeleteBatch(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

ids, err := q.SendBatch(ctx, queue, []map[string]any{testMsg1, testMsg2})
require.NoError(t, err)

deleted, err := q.DeleteBatch(ctx, queue, ids)
require.NoError(t, err)
require.True(t, deleted)

_, err = q.Read(ctx, queue, 0)
require.ErrorIs(t, err, ErrNoRows)
}

func TestDeleteBatchNotExists(t *testing.T) {
ctx := context.Background()
queue := t.Name()

err := q.CreateQueue(ctx, queue)
require.NoError(t, err)

archived, err := q.DeleteBatch(ctx, queue, []int64{100})
require.NoError(t, err)
require.EqualValues(t, 0, rowsAffected)
require.True(t, archived)
}

0 comments on commit e26f210

Please sign in to comment.