Skip to content

OAK-10453 - Pipelined strategy: enforce size limit on memory taken by objects in the queue between download and transform thread#1130

Merged
tihom88 merged 27 commits intoapache:trunkfrom
nfsantos:OAK-10453
Oct 5, 2023
Merged

OAK-10453 - Pipelined strategy: enforce size limit on memory taken by objects in the queue between download and transform thread#1130
tihom88 merged 27 commits intoapache:trunkfrom
nfsantos:OAK-10453

Conversation

@nfsantos
Copy link
Contributor

@nfsantos nfsantos commented Sep 22, 2023

In the Pipelined strategy, the queue between the mongo-dump and the transform stage keeps a list of documents downloaded from Mongo. In the current implementation, this queue is bounded by a limit on the maximum number of documents that is can hold. However, this proved to be fragile because Mongo documents can vary widely in size, so if the size of the queue is adjusted for small documents, it will consume too much memory in the cases where documents are too big.

This PR changes the way this queue is bounded to take in consideration the memory used by the Mongo documents in the queue. It introduces the following configuration properties:

  • oak.indexer.pipelined.mongoDocQueueReservedMemoryMB (Default 128) - How much memory is reserved for the queue between the download and transform stages.
  • oak.indexer.pipelined.mongoDocBatchSizeMB (Default: 4) - The approximate maximum size of a batch of documents.
  • oak.indexer.pipelined.mongoDocBatchMaxNumberOfDocuments (Default: 10000) - The maximum number of documents in a batch.

A batch of Mongo documents is added to the queue either when it reaches the limit of the maximum memory usage or of the number of documents in it.

Estimation of the size of a Mongo document

Estimating the size of the in-memory representation of a Mongo document can be done by traversing the document structure but this would be too slow to be done on the download thread, as it involves a series of if branches with type tests for every single document.

The PR takes a different approach, by taking advantage of the support for custom codecs in the Mongo driver.

Before this PR, we relied on the default codec of the Mongo driver, which were parsing the BSON response received from the server into a series of BasicDBObjects. These objects are then passed to the transform thread, which creates a NodeDocument object (Oak API) and from this, a series of NodeStateEntry instances.

This PR adds a new custom codec that parses the BSON stream directly to a NodeDocument, bypassing the creation of BasicDBObjects. This reduces the amount of work that the transform thread has to do, since it avoids the conversion from BasicDBObjects to NodeDocument.

Additionally, the custom codec also has logic to estimate the size of the NodeDocument that it creates from the stream. This additional calculation is very cheap, because it is done at the moment where the information about the size of the fields is readily available.

This change seems to have a neutral to slightly positive effect on the speed of the download thread.

Additional changes:

  • Adds a hook to receive events from the Mongo Java driver for commands sent and received. Added a listener that logs at TRACE level the commands done by the mongo-dump thread. This is useful to get metrics on the time that this thread spends waiting for Mongo and processing requests.

@nfsantos nfsantos marked this pull request as ready for review September 25, 2023 06:35
Copy link
Contributor

@fabriziofortino fabriziofortino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@tihom88
Copy link
Contributor

tihom88 commented Oct 5, 2023

In this approach we are adding some processing to be done by mongo download thread which is the main bottleneck. Would it make sense to do these transformations on download task thread?

@tihom88 tihom88 merged commit 0b6e538 into apache:trunk Oct 5, 2023
@nfsantos
Copy link
Contributor Author

nfsantos commented Oct 5, 2023

In this approach we are adding some processing to be done by mongo download thread which is the main bottleneck. Would it make sense to do these transformations on download task thread?

The Mongo driver uses the thread that calls the Mongo iterator to perform the request to the server and to deserialize the response, so it is the mongo-dump thread that is doing all this work, there is no separate mongo download thread here.

Before this PR, the download and transform threads would do this work, where the BSON stream is provided by Mongo to the codec:

  • mongo-dump - BSON stream -> BasicDBObject (OOTB codec)
  • transform thread - BasicDBObject -> NodeDocument -> NodeStateEntry -> serialize to buffers

Now:

  • mongo-dump - BSON stream -> NodeDocument (custom codec)
  • transform thread - NodeDocument -> NodeStateEntry -> serialize to buffers

And the transformation BSON stream -> NodeDocument takes the same or less time than to BasicDBOObject.

Overall, there is less work on the transform thread. The mongo-dump thread is doing the same work. I did not find a way to pull the BSON Stream -> X out of the mongo download thread, this is controlled by the mongo java drivers.

@nfsantos nfsantos deleted the OAK-10453 branch October 5, 2023 06:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants