Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9228000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aggregate_metric_double_typed_block,9227000
esql_timestamps_info,9228000
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void testSortByManyLongsGiantTopN() throws IOException {
.entry("values", List.of(List.of(9)))
.entry("documents_found", greaterThan(0))
.entry("values_loaded", greaterThan(0))
.entry("completion_time_in_millis", greaterThan(0L))
.entry("expiration_time_in_millis", greaterThan(0L))
.entry("start_time_in_millis", greaterThan(0L))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2767,13 +2767,20 @@ protected static MapMatcher getProfileMatcher() {
.entry("plans", instanceOf(List.class));
}

protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound) {
protected static MapMatcher getResultMatcher(boolean includePartial, boolean includeDocumentsFound, boolean includeTimestamps) {
MapMatcher mapMatcher = matchesMap();
if (includeDocumentsFound) {
// Older versions may not return documents_found and values_loaded.
mapMatcher = mapMatcher.entry("documents_found", greaterThanOrEqualTo(0));
mapMatcher = mapMatcher.entry("values_loaded", greaterThanOrEqualTo(0));
}
if (includeTimestamps) {
// Older versions may not return start_time_in_millis, completion_time_in_millis and expiration_time_in_millis
mapMatcher = mapMatcher.entry("start_time_in_millis", greaterThanOrEqualTo(0L));
mapMatcher = mapMatcher.entry("completion_time_in_millis", greaterThanOrEqualTo(0L));
mapMatcher = mapMatcher.entry("expiration_time_in_millis", greaterThanOrEqualTo(0L));
}

mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
// Older version may not have is_partial
if (includePartial) {
Expand All @@ -2786,7 +2793,11 @@ protected static MapMatcher getResultMatcher(boolean includePartial, boolean inc
* Create empty result matcher from result, taking into account all metadata items.
*/
protected static MapMatcher getResultMatcher(Map<String, Object> result) {
return getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found"));
return getResultMatcher(
result.containsKey("is_partial"),
result.containsKey("documents_found"),
result.containsKey("start_time_in_millis")
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,11 @@ private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities
}

private void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, Map<String, Object> expectedResult) {
MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk();
MapMatcher mapMatcher = getResultMatcher(
result.containsKey("is_partial"),
result.containsKey("documents_found"),
result.containsKey("start_time_in_millis")
).extraOk();
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities
}

private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
MapMatcher mapMatcher = getResultMatcher(result.containsKey("is_partial"), result.containsKey("documents_found")).extraOk();
MapMatcher mapMatcher = getResultMatcher(
result.containsKey("is_partial"),
result.containsKey("documents_found"),
result.containsKey("start_time_in_millis")
).extraOk();
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
Expand Down Expand Up @@ -523,7 +527,11 @@ public void testLookupJoinAliasesSkipOld() throws IOException {
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size()));

MapMatcher mapMatcher = getResultMatcher(false, result.containsKey("documents_found")).extraOk();
MapMatcher mapMatcher = getResultMatcher(
false,
result.containsKey("documents_found"),
result.containsKey("start_time_in_millis")
).extraOk();
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
mapMatcher = mapMatcher.entry("is_partial", true);
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -288,7 +289,7 @@ public static RequestObjectBuilder jsonBuilder() throws IOException {

public void testGetAnswer() throws IOException {
Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
assertEquals(6, answer.size());
assertEquals(9, answer.size());
assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
Map<String, String> colA = Map.of("name", "a", "type", "integer");
Map<String, String> colB = Map.of("name", "b", "type", "integer");
Expand All @@ -300,6 +301,9 @@ public void testGetAnswer() throws IOException {
.entry("values_loaded", 0)
.entry("columns", List.of(colA, colB))
.entry("values", List.of(List.of(1, 2)))
.entry("completion_time_in_millis", greaterThan(0L))
.entry("expiration_time_in_millis", greaterThan(0L))
.entry("start_time_in_millis", greaterThan(0L))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
"esql_documents_found_and_values_loaded"
);
private static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = TransportVersion.fromName("esql_profile_include_plan");
private static final TransportVersion ESQL_TIMESTAMPS_INFO = TransportVersion.fromName("esql_timestamps_info");

public static final String DROP_NULL_COLUMNS_OPTION = "drop_null_columns";

Expand All @@ -63,6 +64,9 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
private final boolean isAsync;
private final EsqlExecutionInfo executionInfo;

private final long startTimeMillis;
private final long expirationTimeMillis;

public EsqlQueryResponse(
List<ColumnInfoImpl> columns,
List<Page> pages,
Expand All @@ -73,6 +77,8 @@ public EsqlQueryResponse(
@Nullable String asyncExecutionId,
boolean isRunning,
boolean isAsync,
long startTimeMillis,
long expirationTimeMillis,
EsqlExecutionInfo executionInfo
) {
this.columns = columns;
Expand All @@ -84,6 +90,8 @@ public EsqlQueryResponse(
this.asyncExecutionId = asyncExecutionId;
this.isRunning = isRunning;
this.isAsync = isAsync;
this.startTimeMillis = startTimeMillis;
this.expirationTimeMillis = expirationTimeMillis;
this.executionInfo = executionInfo;
}

Expand All @@ -95,9 +103,24 @@ public EsqlQueryResponse(
@Nullable Profile profile,
boolean columnar,
boolean isAsync,
long startTimeMillis,
long expirationTimeMillis,
EsqlExecutionInfo executionInfo
) {
this(columns, pages, documentsFound, valuesLoaded, profile, columnar, null, false, isAsync, executionInfo);
this(
columns,
pages,
documentsFound,
valuesLoaded,
profile,
columnar,
null,
false,
isAsync,
startTimeMillis,
expirationTimeMillis,
executionInfo
);
}

/**
Expand All @@ -121,6 +144,14 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
Profile profile = in.readOptionalWriteable(Profile::readFrom);
boolean columnar = in.readBoolean();

long startTimeMillis = 0L;
long expirationTimeMillis = 0L;
if (in.getTransportVersion().supports(ESQL_TIMESTAMPS_INFO)) {
startTimeMillis = in.readLong();
expirationTimeMillis = in.readLong();
}

EsqlExecutionInfo executionInfo = in.readOptionalWriteable(EsqlExecutionInfo::new);
return new EsqlQueryResponse(
columns,
Expand All @@ -132,6 +163,8 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
asyncExecutionId,
isRunning,
isAsync,
startTimeMillis,
expirationTimeMillis,
executionInfo
);
}
Expand All @@ -149,6 +182,12 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeOptionalWriteable(profile);
out.writeBoolean(columnar);

if (out.getTransportVersion().supports(ESQL_TIMESTAMPS_INFO)) {
out.writeLong(startTimeMillis);
out.writeLong(expirationTimeMillis);
}

out.writeOptionalWriteable(executionInfo);
}

Expand Down Expand Up @@ -237,21 +276,36 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}));
}
if (executionInfo != null && executionInfo.overallTook() != null) {
content.add(
ChunkedToXContentHelper.chunk(
(builder, p) -> builder //
.field("took", executionInfo.overallTook().millis())
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial())
)
);
content.add(ChunkedToXContentHelper.chunk((builder, p) -> {
builder //
.field("took", executionInfo.overallTook().millis())
.field(EsqlExecutionInfo.IS_PARTIAL_FIELD.getPreferredName(), executionInfo.isPartial());

if (startTimeMillis != 0L) {
builder.timestampFieldsFromUnixEpochMillis(
"completion_time_in_millis",
"completion_time",
startTimeMillis + executionInfo.overallTook().millis()
);
}

return builder;
}));
}
content.add(
ChunkedToXContentHelper.chunk(
(builder, p) -> builder //
.field("documents_found", documentsFound)
.field("values_loaded", valuesLoaded)
)
);
content.add(ChunkedToXContentHelper.chunk((builder, p) -> {
builder //
.field("documents_found", documentsFound)
.field("values_loaded", valuesLoaded);

if (startTimeMillis != 0L) {
builder.timestampFieldsFromUnixEpochMillis("start_time_in_millis", "start_time", startTimeMillis);
}
if (expirationTimeMillis != 0L) {
builder.timestampFieldsFromUnixEpochMillis("expiration_time_in_millis", "expiration_time", expirationTimeMillis);
}

return builder;
}));
if (dropNullColumns) {
content.add(ResponseXContentUtils.allColumns(columns, "all_columns"));
content.add(ResponseXContentUtils.nonNullColumns(columns, nullColumns, "columns"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ public EsqlExecutionInfo executionInfo() {
@Override
public EsqlQueryResponse getCurrentResult() {
// TODO it'd be nice to have the number of documents we've read from completed drivers here
return new EsqlQueryResponse(List.of(), List.of(), 0, 0, null, false, getExecutionId().getEncoded(), true, true, executionInfo);
return new EsqlQueryResponse(
List.of(),
List.of(),
0,
0,
null,
false,
getExecutionId().getEncoded(),
true,
true,
getStartTime(),
getExpirationTimeMillis(),
executionInfo
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea
asyncExecutionId,
false,
request.async(),
task.getStartTime(),
((EsqlQueryTask) task).getExpirationTimeMillis(),
result.executionInfo()
);
}
Expand All @@ -414,6 +416,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, boolea
profile,
request.columnar(),
request.async(),
task.getStartTime(),
threadPool.absoluteTimeInMillis() + request.keepAlive().millis(),
result.executionInfo()
);
}
Expand Down Expand Up @@ -479,6 +483,8 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
asyncExecutionId,
true, // is_running
true, // isAsync
task.getStartTime(),
task.getExpirationTimeMillis(),
task.executionInfo()
);
}
Expand Down
Loading