Skip to content

Commit

Permalink
[Rollup] Add more diagnostic stats to job (#35471)
Browse files Browse the repository at this point in the history
This adds some new statistics to the job to help with debugging
performance issues:

- Total search and index time (in milliseconds) encounteed by the indexer
during runtime.  This time is the total service time including
transfer between nodes, not just the `took` time.

- Total count of search and index requests.  Together with the total
times, this can be used to determine average request time.

- Count of search/bulk failures encountered during
runtime.  This information is also in the log, but a runtime counter
will help expose problems faster
  • Loading branch information
polyfractal committed Nov 27, 2018
1 parent f2deb9a commit 6f2b2d9
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 39 deletions.
Expand Up @@ -26,15 +26,15 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Response from rollup's get jobs api.
Expand All @@ -51,6 +51,12 @@ public class GetRollupJobResponse {
static final ParseField STATE = new ParseField("job_state");
static final ParseField CURRENT_POSITION = new ParseField("current_position");
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
static final ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
static final ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
static final ParseField INDEX_TOTAL = new ParseField("index_total");
static final ParseField SEARCH_TOTAL = new ParseField("search_total");
static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
static final ParseField INDEX_FAILURES = new ParseField("index_failures");

private List<JobWrapper> jobs;

Expand Down Expand Up @@ -181,12 +187,25 @@ public static class RollupIndexerJobStats {
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
private long indexTime;
private long indexTotal;
private long searchTime;
private long searchTotal;
private long indexFailures;
private long searchFailures;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}

/**
Expand Down Expand Up @@ -217,15 +236,65 @@ public long getOutputDocuments() {
return numOuputDocuments;
}

/**
* Number of failures that have occurred during the bulk indexing phase of Rollup
*/
public long getIndexFailures() {
return indexFailures;
}

/**
* Number of failures that have occurred during the search phase of Rollup
*/
public long getSearchFailures() {
return searchFailures;
}

/**
* Returns the time spent indexing (cumulative) in milliseconds
*/
public long getIndexTime() {
return indexTime;
}

/**
* Returns the time spent searching (cumulative) in milliseconds
*/
public long getSearchTime() {
return searchTime;
}

/**
* Returns the total number of indexing requests that have been sent by the rollup job
* (Note: this is not the number of _documents_ that have been indexed)
*/
public long getIndexTotal() {
return indexTotal;
}

/**
* Returns the total number of search requests that have been sent by the rollup job
*/
public long getSearchTotal() {
return searchTotal;
}

private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

@Override
Expand All @@ -234,22 +303,35 @@ public boolean equals(Object other) {
if (other == null || getClass() != other.getClass()) return false;
RollupIndexerJobStats that = (RollupIndexerJobStats) other;
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
}

@Override
public final String toString() {
return "{pages=" + numPages
+ ", input_docs=" + numInputDocuments
+ ", output_docs=" + numOuputDocuments
+ ", invocations=" + numInvocations + "}";
+ ", invocations=" + numInvocations
+ ", index_failures=" + indexFailures
+ ", search_failures=" + searchFailures
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
}
}

Expand Down
Expand Up @@ -62,7 +62,9 @@ private GetRollupJobResponse createTestInstance() {
}

private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong());
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}

private RollupJobStatus randomStatus() {
Expand Down Expand Up @@ -115,6 +117,13 @@ public void toXContent(RollupIndexerJobStats stats, XContentBuilder builder, ToX
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(GetRollupJobResponse.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
builder.field(GetRollupJobResponse.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
builder.field(GetRollupJobResponse.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(GetRollupJobResponse.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(GetRollupJobResponse.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.endObject();
}

}
24 changes: 21 additions & 3 deletions docs/reference/rollup/apis/get-job.asciidoc
Expand Up @@ -101,7 +101,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down Expand Up @@ -221,7 +227,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
},
{
Expand Down Expand Up @@ -270,7 +282,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down
Expand Up @@ -153,9 +153,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} catch (Exception e) {
finishWithFailure(e);
finishWithSearchFailure(e);
}
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
Expand Down Expand Up @@ -256,7 +257,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
*/
protected abstract void onAbort();

private void finishWithFailure(Exception exc) {
private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

Expand Down Expand Up @@ -291,6 +298,7 @@ private IndexerState finishAndSetState() {
}

private void onSearchResponse(SearchResponse searchResponse) {
stats.markEndSearch();
try {
if (checkState(getState()) == false) {
return;
Expand Down Expand Up @@ -320,6 +328,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
// TODO this might be a valid case, e.g. if implementation filters
assert bulkRequest.requests().size() > 0;

stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
Expand All @@ -335,24 +344,24 @@ private void onSearchResponse(SearchResponse searchResponse) {
position.set(newPosition);

onBulkResponse(bulkResponse, newPosition);
}, exc -> finishWithFailure(exc)));
}, this::finishWithIndexingFailure));
} catch (Exception e) {
finishWithFailure(e);
finishWithSearchFailure(e);
}
}

private void onBulkResponse(BulkResponse response, JobPosition position) {
stats.markEndIndexing();
try {

ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
} else {
doNextSearch(buildSearchRequest(), listener);
}
} catch (Exception e) {
finishWithFailure(e);
finishWithIndexingFailure(e);
}
}

Expand Down

0 comments on commit 6f2b2d9

Please sign in to comment.