New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DocumentSizeObserver infrastructure to allow not reporting upon failures #104859
Conversation
DocumentParsingObserver documentParsingObserver = request.getNormalisedBytesParsed() != -1L | ||
? documentParsingSupplier.forAlreadyParsedInIngest(request.getNormalisedBytesParsed()) | ||
: documentParsingSupplier.getNewObserver(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could simplify this with just
DocumentParsingObserver documentParsingObserver = request.getNormalisedBytesParsed() != -1L | |
? documentParsingSupplier.forAlreadyParsedInIngest(request.getNormalisedBytesParsed()) | |
: documentParsingSupplier.getNewObserver(); | |
DocumentParsingObserver documentParsingObserver = request.getNormalisedBytesParsed() != -1L | |
? DocumentParsingObserver.EMPTY_INSTANCE | |
: documentParsingSupplier.getNewObserver(); |
Then in onComplete
I could simply add an if
DocumentParsingReporter documentParsingReporter = documentParsingSupplier.getDocumentParsingReporter();
IndexRequest request = context.getRequestToExecute();
if(request.getNormalisedBytesParsed()!=-1) {
documentParsingReporter.onCompleted(docWriteRequest.index(), request.getNormalisedBytesParsed());
} else {
DocumentParsingObserver documentParsingObserver = context.getDocumentParsingObserver();
documentParsingReporter.onCompleted(docWriteRequest.index(), documentParsingObserver.normalisedBytesParsed());
}
with this I could remove the forAlreadyParsedInIngest
method and the FixedDocumentParsingObserver instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether simplified or not, maybe it would be nice to extract this out into a static method on DocumentParsingObserver
(or something like that)? It would give you a handy place to write a javadoc and would encapsulate the -1L
magic number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point, better to hide the complexity of dealing with a magic number in a serverless plugin
Hi @pgomulka, I've created a changelog YAML for you. |
Pinging @elastic/es-core-infra (Team:Core/Infra) |
Given the change to need to explicitly serialize the state, and that state being a long (the size of the document), perhaps we should narrow the scope of the observer. We could rename it to DocumentSizeObserver (or something similar), and the resulting size is what is serialized. So the observer would explicitly have the size available after parsing. Then we would not need 2 different interfaces. @pgomulka wdyt? |
fair point, I like the explicit size in a name. - updated the PR
The reason for split in |
I suggest that maybe we focus on INDEX only here. I have a change for updates, but it would complicate the PR even more (wiring and changes to UpdateHelper https://github.com/elastic/elasticsearch/pull/105063/files#diff-6a17455034b7c4884ae809d49357d6dfedaedd2d0780a10af30b314ce757546e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good, I have a few more nits on naming
@@ -167,6 +167,7 @@ static TransportVersion def(int id) { | |||
public static final TransportVersion DESIRED_NODE_VERSION_OPTIONAL_STRING = def(8_580_00_0); | |||
public static final TransportVersion ML_INFERENCE_REQUEST_INPUT_TYPE_UNSPECIFIED_ADDED = def(8_581_00_0); | |||
public static final TransportVersion ASYNC_SEARCH_STATUS_SUPPORTS_KEEP_ALIVE = def(8_582_00_0); | |||
public static final TransportVersion NORMALISED_BYTES_PARSED = def(8_583_00_0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be more descriptive? eg INDEX_REQUEST_NORMALIZED_BYTES_PARSED
? It's a mouthful, but it gives at least some context as to what the normalized bytes are for.
/** | ||
* An interface to provide instances of document parsing observer and reporter | ||
*/ | ||
public interface DocumentParsingSupplier { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: supplier is normally for a singular thing being supplied. We've typically used "provider" (SPI terminology) for something like this which provides many things.
/** | ||
* @return a new 'empty' observer to use when observing parsing | ||
*/ | ||
DocumentSizeObserver getDocumentSizeObserver(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since new objects are being created (it's not the same instance each call), can we use "new" or "create" prefix instead of "get"?
/** | ||
* @return an observer to use when continue observing parsing based on previous result | ||
*/ | ||
DocumentSizeObserver getDocumentSizeObserver(long normalisedBytesParsed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we call this newFixedDocumentSizeObserver
to make clear the size will not change?
*/ | ||
Supplier<DocumentParsingObserver> getDocumentParsingObserverSupplier(); | ||
DocumentParsingSupplier getDocumentParsingSupplier(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we eliminate a level of indirection here by having the plugin interface contain the 3 methods for constructing these classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the DocumentParsingSupplier
is being used in transport classes/ingestService. It would be odd to pass down a plugin into those classes
@@ -173,6 +180,7 @@ protected int primaryOperationCount(BulkShardRequest request) { | |||
return request.items().length; | |||
} | |||
|
|||
// TODO PG this is just for testing? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was worried that there is another public method performOnPrimary
but it turns out that it is only used in testing. It is being called by xpack test, hence a scope had to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more nits about naming, and it looks like the TransportVersion needs to be updated. Otherwise LGTM.
@@ -204,6 +206,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio | |||
} else { | |||
requireDataStream = false; | |||
} | |||
if (in.getTransportVersion().onOrAfter(INDEX_REQUEST_NORMALIZED_BYTES_PARSED)) { | |||
normalisedBytesParsed = in.readLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason not to use vlong?
/** | ||
* An interface to allow performing an action when parsing has been completed and successful | ||
*/ | ||
public interface DocumentParsingReporter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this only handles document size, can we make the name parallel to the observer as DocumentSizeReporter?
*/ | ||
public interface DocumentParsingObserverPlugin { | ||
public interface DocumentParsingSupplierPlugin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we now call the supplier a provider, can we use the same terminology in the plugin here, ie DocumentParsingProviderPlugin?
/** | ||
* @return an observer to use when continue observing parsing based on previous result | ||
*/ | ||
DocumentSizeObserver newDocumentSizeObserver(long normalisedBytesParsed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a more distinctive name from creating a "real" observer above? eg newFixedDocumentSizeObserver
, and make it clear in the javadoc this observer does not actually observe anything, it reports the exact size passed in. The javadoc currently implies it would keep increasing the size when passed to a parser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not always a fixedDcoumetnSizeObserver.
If the parsing has not happened in the IngestService
we still want to create a new Metering DocumetnSizeObserver.
I will keep the method name, but will remove the javadoc wording indicating that it will continue parsing (this was based on the previous idea that we wanted to keep the interfaces in ES independent of the implementation in serverless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the 2 methods as per your suggestion there and updated the javadoc
udpate request that are sending a document (or part of it) should allow for metering the size of that doc the update request that are using a script should not be metered - reported size 0. this commit is following up on #104859 The parsing is of the update's document is being done in UpdateHelper - the same pattern we use to meter parsing in IngestService. If the script is being used, the size observed will be 0. The value observed is then reported in the TransportShardBulkAction and thanks to the value being 0 or positive it will not be metering the modified document again. This commit also renames the getDocumentParsingSupplier to getDocumentParsingProvider (this was accidentally omitted in the #104859)
We want to report that observation of document parsing has finished only upon a successful indexing.
To achieve this, we need to perform reporting only in one place (not as previously in both IngestService and 'bulk action')
This commit splits the
DocumentParsingObserver
in two. One for wrapping an XContentParser and returning the observed state - theDocumentSizeObserver
and aDocumentSizeReporter
to perform an action when parsing has been completed and indexing successful.To perform reporting in one place we need to pass
the state
from IngestService to 'bulk action'. The state is currently represented as long - normalisedBytesParsed.In
TransportShardBulkAction
we are getting thenormalisedBytesParsed
information and in the serverless plugin we will check if the value is indicating that parsing already happened inIngestService
(value being != -1) we create aDocumentSizeObserver
with the fixednormalisedBytesParsed
and won't increment it.When the indexing is completed and successful we report the observed state for an index with
DocumentSizeReporter
small nit: by passing the documentParsingObserve via
SourceToParse
we no longer have to inject it via complex hierarchy for DocumentParser. Hence some constructor changes