Skip to content

Commit

Permalink
[ML][Transforms] adjusting stats.progress for cont. transforms (#45361)
Browse files Browse the repository at this point in the history
* [ML][Transforms] adjusting stats.progress for cont. transforms

* addressing PR comments

* rename fix
  • Loading branch information
benwtrent committed Aug 14, 2019
1 parent c980043 commit 5fb9081
Show file tree
Hide file tree
Showing 27 changed files with 706 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ public abstract class IndexerJobStats {
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");

private final long numPages;
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;
private final long indexTime;
private final long indexTotal;
private final long searchTime;
private final long searchTotal;
private final long indexFailures;
private final long searchFailures;
protected final long numPages;
protected final long numInputDocuments;
protected final long numOuputDocuments;
protected final long numInvocations;
protected final long indexTime;
protected final long indexTotal;
protected final long searchTime;
protected final long searchTotal;
protected final long indexFailures;
protected final long searchFailures;

public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,28 @@
package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.client.core.IndexerJobStats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameIndexerTransformStats extends IndexerJobStats {

static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed");
static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed");

public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
NAME, true, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
NAME,
true,
args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(Double) args[10], (Double) args[11], (Double) args[12]));

static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
Expand All @@ -44,16 +54,74 @@ public class DataFrameIndexerTransformStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_INDEXED);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_DOCUMENTS_PROCESSED);
}

public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) throws IOException {
return LENIENT_PARSER.parse(parser, null);
}

private final double expAvgCheckpointDurationMs;
private final double expAvgDocumentsIndexed;
private final double expAvgDocumentsProcessed;

public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
long indexTotal, long searchTotal, long indexFailures, long searchFailures,
Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
indexTotal, searchTotal, indexFailures, searchFailures);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
}

public double getExpAvgCheckpointDurationMs() {
return expAvgCheckpointDurationMs;
}

public double getExpAvgDocumentsIndexed() {
return expAvgDocumentsIndexed;
}

public double getExpAvgDocumentsProcessed() {
return expAvgDocumentsProcessed;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

DataFrameIndexerTransformStats that = (DataFrameIndexerTransformStats) other;

return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,40 @@

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.client.dataframe.transforms.util.TimeUtil;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.time.Instant;
import java.util.Objects;

public class DataFrameTransformCheckpointingInfo {

public static final ParseField LAST_CHECKPOINT = new ParseField("last", "current");
public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress");
public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind");
public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at");

private final DataFrameTransformCheckpointStats last;
private final DataFrameTransformCheckpointStats next;
private final long operationsBehind;
private final Instant changesLastDetectedAt;

private static final ConstructingObjectParser<DataFrameTransformCheckpointingInfo, Void> LENIENT_PARSER =
new ConstructingObjectParser<>(
"data_frame_transform_checkpointing_info", true, a -> {
"data_frame_transform_checkpointing_info",
true,
a -> {
long behind = a[2] == null ? 0L : (Long) a[2];

Instant changesLastDetectedAt = (Instant)a[3];
return new DataFrameTransformCheckpointingInfo(
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1], behind);
a[0] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[0],
a[1] == null ? DataFrameTransformCheckpointStats.EMPTY : (DataFrameTransformCheckpointStats) a[1],
behind,
changesLastDetectedAt);
});

static {
Expand All @@ -51,13 +61,20 @@ public class DataFrameTransformCheckpointingInfo {
LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataFrameTransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT);
LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND);
LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()),
CHANGES_LAST_DETECTED_AT,
ObjectParser.ValueType.VALUE);
}

public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last, DataFrameTransformCheckpointStats next,
long operationsBehind) {
public DataFrameTransformCheckpointingInfo(DataFrameTransformCheckpointStats last,
DataFrameTransformCheckpointStats next,
long operationsBehind,
Instant changesLastDetectedAt) {
this.last = Objects.requireNonNull(last);
this.next = Objects.requireNonNull(next);
this.operationsBehind = operationsBehind;
this.changesLastDetectedAt = changesLastDetectedAt;
}

public DataFrameTransformCheckpointStats getLast() {
Expand All @@ -72,13 +89,18 @@ public long getOperationsBehind() {
return operationsBehind;
}

@Nullable
public Instant getChangesLastDetectedAt() {
return changesLastDetectedAt;
}

public static DataFrameTransformCheckpointingInfo fromXContent(XContentParser p) {
return LENIENT_PARSER.apply(p, null);
}

@Override
public int hashCode() {
return Objects.hash(last, next, operationsBehind);
return Objects.hash(last, next, operationsBehind, changesLastDetectedAt);
}

@Override
Expand All @@ -94,8 +116,9 @@ public boolean equals(Object other) {
DataFrameTransformCheckpointingInfo that = (DataFrameTransformCheckpointingInfo) other;

return Objects.equals(this.last, that.last) &&
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind;
Objects.equals(this.next, that.next) &&
this.operationsBehind == that.operationsBehind &&
Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,81 @@

package org.elasticsearch.client.dataframe.transforms;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DataFrameTransformProgress {

public static final ParseField TOTAL_DOCS = new ParseField("total_docs");
public static final ParseField DOCS_REMAINING = new ParseField("docs_remaining");
public static final ParseField PERCENT_COMPLETE = new ParseField("percent_complete");
public static final ParseField DOCS_PROCESSED = new ParseField("docs_processed");
public static final ParseField DOCS_INDEXED = new ParseField("docs_indexed");

public static final ConstructingObjectParser<DataFrameTransformProgress, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_transform_progress",
true,
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2]));
a -> new DataFrameTransformProgress((Long) a[0], (Long)a[1], (Double)a[2], (Long)a[3], (Long)a[4]));

static {
PARSER.declareLong(constructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), TOTAL_DOCS);
PARSER.declareLong(optionalConstructorArg(), DOCS_REMAINING);
PARSER.declareDouble(optionalConstructorArg(), PERCENT_COMPLETE);
PARSER.declareLong(optionalConstructorArg(), DOCS_PROCESSED);
PARSER.declareLong(optionalConstructorArg(), DOCS_INDEXED);
}

public static DataFrameTransformProgress fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final long totalDocs;
private final long remainingDocs;
private final double percentComplete;

public DataFrameTransformProgress(long totalDocs, Long remainingDocs, double percentComplete) {
private final Long totalDocs;
private final Long remainingDocs;
private final Double percentComplete;
private final long documentsProcessed;
private final long documentsIndexed;

public DataFrameTransformProgress(Long totalDocs,
Long remainingDocs,
Double percentComplete,
Long documentsProcessed,
Long documentsIndexed) {
this.totalDocs = totalDocs;
this.remainingDocs = remainingDocs == null ? totalDocs : remainingDocs;
this.percentComplete = percentComplete;
this.documentsProcessed = documentsProcessed == null ? 0 : documentsProcessed;
this.documentsIndexed = documentsIndexed == null ? 0 : documentsIndexed;
}

public double getPercentComplete() {
@Nullable
public Double getPercentComplete() {
return percentComplete;
}

public long getTotalDocs() {
@Nullable
public Long getTotalDocs() {
return totalDocs;
}

public long getRemainingDocs() {
@Nullable
public Long getRemainingDocs() {
return remainingDocs;
}

public long getDocumentsProcessed() {
return documentsProcessed;
}

public long getDocumentsIndexed() {
return documentsIndexed;
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand All @@ -84,11 +107,13 @@ public boolean equals(Object other) {
DataFrameTransformProgress that = (DataFrameTransformProgress) other;
return Objects.equals(this.remainingDocs, that.remainingDocs)
&& Objects.equals(this.totalDocs, that.totalDocs)
&& Objects.equals(this.percentComplete, that.percentComplete);
&& Objects.equals(this.percentComplete, that.percentComplete)
&& Objects.equals(this.documentsIndexed, that.documentsIndexed)
&& Objects.equals(this.documentsProcessed, that.documentsProcessed);
}

@Override
public int hashCode(){
return Objects.hash(remainingDocs, totalDocs, percentComplete);
return Objects.hash(remainingDocs, totalDocs, percentComplete, documentsIndexed, documentsProcessed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,20 @@ public void testGetStats() throws Exception {
DataFrameTransformStats stats = statsResponse.getTransformsStats().get(0);
assertEquals(DataFrameTransformStats.State.STOPPED, stats.getState());

DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0.0,
0.0,
0.0);
assertEquals(zeroIndexerStats, stats.getIndexerStats());

// start the transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public void testFromXContent() throws IOException {
public static DataFrameIndexerTransformStats randomStats() {
return new DataFrameIndexerTransformStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
}

public static void toXContent(DataFrameIndexerTransformStats stats, XContentBuilder builder) throws IOException {
Expand All @@ -57,6 +60,12 @@ public static void toXContent(DataFrameIndexerTransformStats stats, XContentBuil
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
stats.getExpAvgDocumentsIndexed());
builder.field(DataFrameIndexerTransformStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(),
stats.getExpAvgDocumentsProcessed());
builder.endObject();
}
}

0 comments on commit 5fb9081

Please sign in to comment.