Skip to content

Commit

Permalink
Improve iterator.Interface implementation for pgdb.queryIterator (#…
Browse files Browse the repository at this point in the history
…1592)

Closes #898 .
  • Loading branch information
rumyantseva committed Nov 30, 2022
1 parent 0a63e58 commit 5efc745
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 36 deletions.
3 changes: 2 additions & 1 deletion integration/update_field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func TestUpdateFieldCurrentDate(t *testing.T) {

t.Run("currentDate", func(t *testing.T) {
// maxDifference is a maximum amount of seconds can differ the value in placeholder from actual value
maxDifference := time.Duration(3 * time.Minute)
// TODO Make duration lower https://github.com/FerretDB/FerretDB/issues/1347
maxDifference := time.Duration(4 * time.Minute)

now := primitive.NewDateTimeFromTime(time.Now().UTC())
nowTimestamp := primitive.Timestamp{T: uint32(time.Now().UTC().Unix()), I: uint32(0)}
Expand Down
5 changes: 0 additions & 5 deletions internal/handlers/pg/msg_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,6 @@ func (h *Handler) execDelete(ctx context.Context, sp *pgdb.SQLParam, filter *typ
return err
}

if it == nil {
// no documents found
return nil
}

defer it.Close()

resDocs := make([]*types.Document, 0, 16)
Expand Down
4 changes: 0 additions & 4 deletions internal/handlers/pg/msg_find.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ func (h *Handler) fetchAndFilterDocs(ctx context.Context, tx pgx.Tx, sqlParam *p
return nil, err
}

if it == nil {
return resDocs, nil
}

defer it.Close()

for {
Expand Down
4 changes: 2 additions & 2 deletions internal/handlers/pg/pgdb/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ type SQLParam struct {
}

// GetDocuments returns an queryIterator to fetch documents for given SQLParams.
// If the collection doesn't exist, it returns nil and no error.
// If the collection doesn't exist, it returns an empty iterator and no error.
// If an error occurs, it returns nil and that error, possibly wrapped.
func (pgPool *Pool) GetDocuments(ctx context.Context, tx pgx.Tx, sp *SQLParam) (
iterator.Interface[uint32, *types.Document], error,
) {
q, args, err := buildQuery(ctx, tx, sp)
if err != nil {
if errors.Is(err, ErrTableNotExist) {
return nil, nil
return newIterator(ctx, nil), nil
}

return nil, lazyerrors.Error(err)
Expand Down
37 changes: 14 additions & 23 deletions internal/handlers/pg/pgdb/query_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,26 @@ import (
type queryIterator struct {
ctx context.Context
rows pgx.Rows
mxClosed *sync.Mutex // protects closed
closed bool // indicates whether Close() was called
closeOnce sync.Once
currentIter atomic.Uint32
}

// newIterator returns a new queryIterator for the given pgx.Rows.
// It sets finalizer to close the rows.
func newIterator(ctx context.Context, rows pgx.Rows) iterator.Interface[uint32, *types.Document] {
// queryIterator is defined as pointer to address it in the finalizer.
qi := &queryIterator{
ctx: ctx,
rows: rows,
closed: false,
mxClosed: new(sync.Mutex),
it := &queryIterator{
ctx: ctx,
rows: rows,
}

runtime.SetFinalizer(qi, func(qi *queryIterator) {
qi.mxClosed.Lock()
defer qi.mxClosed.Unlock()
if !qi.closed {
runtime.SetFinalizer(it, func(it *queryIterator) {
it.closeOnce.Do(func() {
panic("queryIterator.Close() has not been called")
}
})
})

return qi
return it
}

// Next implements iterator.Interface.
Expand All @@ -70,7 +65,7 @@ func (it *queryIterator) Next() (uint32, *types.Document, error) {
return 0, nil, err
}

if !it.rows.Next() {
if it.rows == nil || !it.rows.Next() {
return 0, nil, iterator.ErrIteratorDone
}

Expand All @@ -91,13 +86,9 @@ func (it *queryIterator) Next() (uint32, *types.Document, error) {

// Close implements iterator.Interface.
func (it *queryIterator) Close() {
it.mxClosed.Lock()
defer it.mxClosed.Unlock()

if it.closed {
return
}

it.rows.Close()
it.closed = true
it.closeOnce.Do(func() {
if it.rows != nil {
it.rows.Close()
}
})
}
8 changes: 7 additions & 1 deletion internal/handlers/pg/pgdb/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,14 @@ func TestGetDocuments(t *testing.T) {
sp := &SQLParam{DB: databaseName, Collection: collectionName + "-non-existent"}
it, err := pool.GetDocuments(ctx, tx, sp)
require.NoError(t, err)
require.Nil(t, it)
require.NotNil(t, it)

iter, doc, err := it.Next()
assert.Equal(t, iterator.ErrIteratorDone, err)
assert.Equal(t, uint32(0), iter)
assert.Nil(t, doc)

it.Close()
require.NoError(t, tx.Commit(ctx))
})
}

1 comment on commit 5efc745

@vercel
Copy link

@vercel vercel bot commented on 5efc745 Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ferret-db – ./

ferret-db.vercel.app
ferret-db-ferretdb.vercel.app
ferret-db-git-main-ferretdb.vercel.app

Please sign in to comment.