Skip to content

Commit

Permalink
feat: ensure all cursors get closed
Browse files Browse the repository at this point in the history
  • Loading branch information
stfsy committed Mar 3, 2024
1 parent 2d45742 commit 55b0fd7
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 19 deletions.
4 changes: 3 additions & 1 deletion lib/one-to-few-ref-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const { createTracer } = require('@discue/open-telemetry-tracing')
const { EQUALS_ANY_OF, EQUALS } = require('./aggregations.js')
const Base = require('./simple-resource-storage.js')
const { name } = require('../package.json')
const { toArrayAndClose } = require('./safe-cursor.js')

/**
* @private
Expand Down Expand Up @@ -163,7 +164,8 @@ module.exports = class extends Base {
EQUALS_ANY_OF(this._resourceName, references)
]

return collection.aggregate(stages).toArray()
const cursor = collection.aggregate(stages)
return toArrayAndClose(cursor)
})
}

Expand Down
12 changes: 5 additions & 7 deletions lib/one-to-few-resource-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Base = require('./simple-resource-storage.js')
const { Timestamp } = require('mongodb')
const { createTracer } = require('@discue/open-telemetry-tracing')
const { name } = require('../package.json')
const { getFirstAndClose, toArrayAndClose } = require('./safe-cursor.js')

/**
* @private
Expand Down Expand Up @@ -114,12 +115,8 @@ module.exports = class extends Base {
aggregationStages.push(PROJECT(projection))
}

const cursor = await collection.aggregate(aggregationStages)
try {
return await cursor.next()
} finally {
await cursor.close()
}
const cursor = collection.aggregate(aggregationStages)
return getFirstAndClose(cursor)
})
}

Expand Down Expand Up @@ -148,7 +145,8 @@ module.exports = class extends Base {
aggregationStages.push(PROJECT(projection))
}

return collection.aggregate(aggregationStages).toArray()
const cursor = collection.aggregate(aggregationStages)
return toArrayAndClose(cursor)
})
}

Expand Down
8 changes: 5 additions & 3 deletions lib/one-to-many-resource-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const usageEventTrigger = require('./usage-event-trigger.js')
const { createTracer } = require('@discue/open-telemetry-tracing')
const { name } = require('../package.json')
const SpanStatusCode = require('@discue/open-telemetry-tracing/status-codes')
const { toArrayAndClose } = require('./safe-cursor.js')

/**
* @private
Expand Down Expand Up @@ -168,8 +169,8 @@ module.exports = class extends Base {
async _getResourceWithLookup(resourceIds, options = {}) {
const collection = await this._getParentCollection()
const aggregationStages = this._getNestedLookupStages(resourceIds, options)
const result = await collection.aggregate(aggregationStages).toArray()

const cursor = collection.aggregate(aggregationStages)
const result = await toArrayAndClose(cursor)
return result?.at(0) ?? null
}

Expand Down Expand Up @@ -274,7 +275,8 @@ module.exports = class extends Base {

const collection = await this._getParentCollection()
const aggregationStages = this._getNestedLookupStages(resourceIds, { withMetadata, projection })
const result = await collection.aggregate(aggregationStages).toArray()
const cursor = collection.aggregate(aggregationStages)
const result = await toArrayAndClose(cursor)

if (result.length === 0) {
return result
Expand Down
29 changes: 29 additions & 0 deletions lib/safe-cursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@


/**
*
* @param {import('mongodb').AbstractCursor} cursor
* @returns {Promise}
*/
module.exports.toArrayAndClose = async function (cursor) {
try {
const array = await cursor.toArray()
return array
} finally {
await cursor.close()
}
}

/**
*
* @param {import('mongodb').AbstractCursor} cursor
* @returns {Promise}
*/
module.exports.getFirstAndClose = async function (cursor) {
try {
const first = await cursor.next()
return first
} finally {
await cursor.close()
}
}
18 changes: 10 additions & 8 deletions lib/simple-resource-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const eventTrigger = require('./usage-event-trigger.js')
const { createTracer } = require('@discue/open-telemetry-tracing')
const { name } = require('../package.json')
const { getSingleLookupPipeline, joinAndQueryChildResourcesPipeline } = require('./lookup-pipeline.js')
const { toArrayAndClose, getFirstAndClose } = require('./safe-cursor.js')

/**
* @private
Expand Down Expand Up @@ -259,11 +260,7 @@ module.exports = class {
}

const cursor = collection.aggregate(aggregationStages)
try {
return await cursor.next()
} finally {
await cursor.close()
}
return getFirstAndClose(cursor)
})
}

Expand All @@ -289,7 +286,8 @@ module.exports = class {
aggregationStages.push(PROJECT(projection))
}

return collection.aggregate(aggregationStages).toArray()
const cursor = collection.aggregate(aggregationStages)
return toArrayAndClose(cursor)
})
}

Expand Down Expand Up @@ -325,7 +323,9 @@ module.exports = class {
lookupPipeline.push(...childResourcesPipeline)

const collection = await this._getCollection()
const result = await collection.aggregate(lookupPipeline).toArray()

const cursor = collection.aggregate(lookupPipeline)
const result = await toArrayAndClose(cursor)

if (result.length === 0) {
return result
Expand All @@ -352,7 +352,9 @@ module.exports = class {
async find(aggregations = []) {
return withActiveSpan(`${name}#find-simple-resources`, { resourceName: this._collectionName, databaseName: this._databaseName }, async () => {
const collection = await this._getCollection()
return collection.aggregate(aggregations).toArray()

const cursor = collection.aggregate(aggregations)
return toArrayAndClose(cursor)
})
}

Expand Down

0 comments on commit 55b0fd7

Please sign in to comment.