Skip to content

Commit

Permalink
SQL: Replace scroll cursors with point-in-time and search_after (#83381)
Browse files Browse the repository at this point in the history
Resolves #61873

The goal of this PR is to remove the use of the deprecated scroll
cursors in SQL. Functionality and APIs should remain the same with one
notable difference: The last page of a search hit query used to always
include a scroll cursor if it is non-empty. This is no longer the case,
if a result set is exhausted, the PIT will be closed and the last page
does not include a cursor.

Note, PIT can also be used for aggregation and PIVOT queries but this is
not in the scope of this PR and will be implemented in a follow up.

Additionally, this PR resolves #80523 because the total doc count is no
longer required.
  • Loading branch information
Lukas Wegmann committed Feb 15, 2022
1 parent 5d84217 commit 68a04a3
Show file tree
Hide file tree
Showing 25 changed files with 585 additions and 426 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/83381.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 83381
summary: Replace scroll cursors with point-in-time and `search_after`
area: SQL
type: enhancement
issues:
- 61873
- 80523
55 changes: 38 additions & 17 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.VersionProperties
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.util.GradleUtils
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.VersionProperties
import org.elasticsearch.gradle.util.GradleUtils

apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.yaml-rest-compat-test'
Expand Down Expand Up @@ -77,54 +77,75 @@ tasks.named("yamlRestTest").configure {
}

tasks.named("yamlRestTestV7CompatTest").configure {
systemProperty 'tests.rest.blacklist', [
'unsigned_long/50_script_values/Scripted sort values',
'unsigned_long/50_script_values/script_score query',
'unsigned_long/50_script_values/Script query',
'data_stream/140_data_stream_aliases/Fix IndexNotFoundException error when handling remove alias action',
].join(',')
systemProperty 'tests.rest.blacklist', [
'unsigned_long/50_script_values/Scripted sort values',
'unsigned_long/50_script_values/script_score query',
'unsigned_long/50_script_values/Script query',
'data_stream/140_data_stream_aliases/Fix IndexNotFoundException error when handling remove alias action',
].join(',')
}

tasks.named("yamlRestTestV7CompatTransform").configure{ task ->
task.skipTest("vectors/10_dense_vector_basic/Deprecated function signature", "to support it, it would require to almost revert back the #48725 and complicate the code" )
tasks.named("yamlRestTestV7CompatTransform").configure { task ->
task.skipTest(
"vectors/10_dense_vector_basic/Deprecated function signature",
"to support it, it would require to almost revert back the #48725 and complicate the code"
)
task.skipTest("vectors/30_sparse_vector_basic/Cosine Similarity", "not supported for compatibility")
task.skipTest("vectors/30_sparse_vector_basic/Deprecated function signature", "not supported for compatibility")
task.skipTest("vectors/30_sparse_vector_basic/Dot Product", "not supported for compatibility")
task.skipTest("vectors/35_sparse_vector_l1l2/L1 norm", "not supported for compatibility")
task.skipTest("vectors/35_sparse_vector_l1l2/L2 norm", "not supported for compatibility")
task.skipTest("vectors/40_sparse_vector_special_cases/Dimensions can be sorted differently", "not supported for compatibility")
task.skipTest("vectors/40_sparse_vector_special_cases/Documents missing a vector field", "not supported for compatibility")
task.skipTest("vectors/40_sparse_vector_special_cases/Query vector has different dimensions from documents' vectors", "not supported for compatibility")
task.skipTest(
"vectors/40_sparse_vector_special_cases/Query vector has different dimensions from documents' vectors",
"not supported for compatibility"
)
task.skipTest("vectors/40_sparse_vector_special_cases/Sparse vectors should error with dense vector functions", "not supported for compatibility")
task.skipTest("vectors/40_sparse_vector_special_cases/Vectors of different dimensions and data types", "not supported for compatibility")
task.skipTest("vectors/50_vector_stats/Usage stats on vector fields", "not supported for compatibility")
task.skipTest("roles/30_prohibited_role_query/Test use prohibited query inside role query", "put role request with a term lookup (deprecated) and type. Requires validation in REST layer")
task.skipTest(
"roles/30_prohibited_role_query/Test use prohibited query inside role query",
"put role request with a term lookup (deprecated) and type. Requires validation in REST layer"
)
task.skipTest("ml/jobs_crud/Test create job with delimited format", "removing undocumented functionality")
task.skipTest("ml/datafeeds_crud/Test update datafeed to point to missing job", "behaviour change #44752 - not allowing to update datafeed job_id")
task.skipTest("ml/datafeeds_crud/Test update datafeed to point to different job", "behaviour change #44752 - not allowing to update datafeed job_id")
task.skipTest("ml/datafeeds_crud/Test update datafeed to point to job already attached to another datafeed", "behaviour change #44752 - not allowing to update datafeed job_id")
task.skipTest(
"ml/datafeeds_crud/Test update datafeed to point to different job",
"behaviour change #44752 - not allowing to update datafeed job_id"
)
task.skipTest(
"ml/datafeeds_crud/Test update datafeed to point to job already attached to another datafeed",
"behaviour change #44752 - not allowing to update datafeed job_id"
)
task.skipTest("rollup/delete_job/Test basic delete_job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/delete_job/Test delete job twice", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/delete_job/Test delete running job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/get_jobs/Test basic get_jobs", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/put_job/Test basic put_job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/start_job/Test start job twice", "rollup was an experimental feature, also see #41227")
task.skipTest("ml/trained_model_cat_apis/Test cat trained models", "A type field was added to cat.ml_trained_models #73660, this is a backwards compatible change. Still this is a cat api, and we don't support them with rest api compatibility. (the test would be very hard to transform too)")
task.skipTest(
"ml/trained_model_cat_apis/Test cat trained models",
"A type field was added to cat.ml_trained_models #73660, this is a backwards compatible change. Still this is a cat api, and we don't support them with rest api compatibility. (the test would be very hard to transform too)"
)
task.skipTest("indices.freeze/30_usage/Usage stats on frozen indices", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("indices.freeze/20_stats/Translog stats on frozen indices", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("indices.freeze/10_basic/Basic", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("indices.freeze/10_basic/Test index options", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("sql/sql/Paging through results", "scrolling through search hit queries no longer produces empty last page in 8.2")
task.skipTest("service_accounts/10_basic/Test get service accounts", "new service accounts are added")

task.replaceValueInMatch("_type", "_doc")
task.addAllowedWarningRegex("\\[types removal\\].*")
task.addAllowedWarningRegexForTest("Including \\[accept_enterprise\\] in get license.*", "Installing enterprise license")
task.addAllowedWarningRegex("bucket_span .* is not an integral .* of the number of seconds in 1d.* This is now deprecated.*")

task.replaceValueTextByKeyValue("catch",
task.replaceValueTextByKeyValue(
"catch",
'bad_request',
'/It is no longer possible to freeze indices, but existing frozen indices can still be unfrozen/',
"Cannot freeze write index for data stream")
"Cannot freeze write index for data stream"
)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class JdbcIntegrationTestCase extends ESRestTestCase {

@After
public void checkSearchContent() throws IOException {
// Some context might linger due to fire and forget nature of scroll cleanup
// Some context might linger due to fire and forget nature of PIT cleanup
assertNoSearchContexts();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.xpack.ql.TestNode;
import org.elasticsearch.xpack.ql.TestNodes;
import org.elasticsearch.xpack.sql.qa.rest.BaseRestSqlTestCase;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Before;

Expand Down Expand Up @@ -111,8 +113,7 @@ private void testNullsOrderWithMissingOrderSupport(RestClient client) throws IOE
assertNull(result.get(2));
}

@SuppressWarnings("unchecked")
private List<Integer> runOrderByNullsLastQuery(RestClient queryClient) throws IOException {
private void indexDocs() throws IOException {
Request putIndex = new Request("PUT", "/test");
putIndex.setJsonEntity("""
{"settings":{"index":{"number_of_shards":3}}}""");
Expand All @@ -124,17 +125,19 @@ private List<Integer> runOrderByNullsLastQuery(RestClient queryClient) throws IO
for (String doc : Arrays.asList("{\"int\":1,\"kw\":\"foo\"}", "{\"int\":2,\"kw\":\"bar\"}", "{\"kw\":\"bar\"}")) {
bulk.append("{\"index\":{}}\n").append(doc).append("\n");
}

indexDocs.setJsonEntity(bulk.toString());
client().performRequest(indexDocs);
}

@SuppressWarnings("unchecked")
private List<Integer> runOrderByNullsLastQuery(RestClient queryClient) throws IOException {
indexDocs();

Request query = new Request("POST", "_sql");
query.setJsonEntity(sqlQueryEntityWithOptionalMode("SELECT int FROM test GROUP BY 1 ORDER BY 1 NULLS LAST", bwcVersion));
Response queryResponse = queryClient.performRequest(query);

assertEquals(200, queryResponse.getStatusLine().getStatusCode());
Map<String, Object> result = performRequestAndReadBodyAsJson(queryClient, query);

InputStream content = queryResponse.getEntity().getContent();
Map<String, Object> result = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
List<List<Object>> rows = (List<List<Object>>) result.get("rows");
return rows.stream().map(row -> (Integer) row.get(0)).collect(Collectors.toList());
}
Expand All @@ -156,4 +159,42 @@ public static String sqlQueryEntityWithOptionalMode(String query, Version bwcVer
return Strings.toString(json);
}

public void testCursorFromOldNodeFailsOnNewNode() throws IOException {
assertCursorNotCompatibleAcrossVersions(bwcVersion, oldNodesClient, Version.CURRENT, newNodesClient);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/83726")
public void testCursorFromNewNodeFailsOnOldNode() throws IOException {
assertCursorNotCompatibleAcrossVersions(Version.CURRENT, newNodesClient, bwcVersion, oldNodesClient);
}

private void assertCursorNotCompatibleAcrossVersions(Version version1, RestClient client1, Version version2, RestClient client2)
throws IOException {
indexDocs();

Request req = new Request("POST", "_sql");
// GROUP BY queries always return a cursor
req.setJsonEntity(sqlQueryEntityWithOptionalMode("SELECT int FROM test GROUP BY 1", bwcVersion));
Map<String, Object> json = performRequestAndReadBodyAsJson(client1, req);
String cursor = (String) json.get("cursor");
assertThat(cursor, Matchers.not(Matchers.emptyString()));

Request scrollReq = new Request("POST", "_sql");
scrollReq.setJsonEntity("{\"cursor\": \"%s\"}".formatted(cursor));
ResponseException exception = expectThrows(ResponseException.class, () -> client2.performRequest(scrollReq));

assertThat(
exception.getMessage(),
Matchers.containsString("Unsupported cursor version [" + version1 + "], expected [" + version2 + "]")
);
}

private Map<String, Object> performRequestAndReadBodyAsJson(RestClient client, Request request) throws IOException {
Response response = client.performRequest(request);
assertEquals(200, response.getStatusLine().getStatusCode());
try (InputStream content = response.getEntity().getContent()) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,27 @@ protected AuditLogAsserter createAuditLogAsserter() {
}

/**
* Test the hijacking a scroll fails. This test is only implemented for
* REST because it is the only API where it is simple to hijack a scroll.
* Test the hijacking a cursor fails. This test is only implemented for
* REST because it is the only API where it is simple to hijack a cursor.
* It should exercise the same code as the other APIs but if we were truly
* paranoid we'd hack together something to test the others as well.
*/
public void testHijackScrollFails() throws Exception {
createUser("full_access", "rest_minimal");
public void testHijackCursorFails() throws Exception {
createUser("no_read", "read_nothing");
final String mode = randomMode();

final String query = randomFrom(
List.of(
"SELECT * FROM test",
"SELECT a FROM test GROUP BY a",
"SELECT MAX(a) FROM test GROUP BY a ORDER BY 1",
"SHOW COLUMNS IN test"
)
);

Map<String, Object> adminResponse = RestActions.runSql(
null,
new StringEntity(query("SELECT * FROM test").mode(mode).fetchSize(1).toString(), ContentType.APPLICATION_JSON),
new StringEntity(query(query).mode(mode).fetchSize(1).toString(), ContentType.APPLICATION_JSON),
mode,
false
);
Expand All @@ -303,20 +312,18 @@ public void testHijackScrollFails() throws Exception {
ResponseException e = expectThrows(
ResponseException.class,
() -> RestActions.runSql(
"full_access",
"no_read",
new StringEntity(cursor(cursor).mode(mode).toString(), ContentType.APPLICATION_JSON),
mode,
false
)
);
// TODO return a better error message for bad scrolls
assertThat(e.getMessage(), containsString("No search context found for id"));
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());

assertThat(e.getMessage(), containsString("is unauthorized for user"));
assertEquals(403, e.getResponse().getStatusLine().getStatusCode());

createAuditLogAsserter().expectSqlCompositeActionFieldCaps("test_admin", "test")
.expect(true, SQL_ACTION_NAME, "full_access", empty())
// one scroll access denied per shard
.expect("access_denied", SQL_ACTION_NAME, "full_access", "default_native", empty(), "InternalScrollSearchRequest")
.expect("access_denied", SQL_ACTION_NAME, "no_read", "default_native", empty(), "SqlQueryRequest")
.assertLogs();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class JdbcIntegrationTestCase extends RemoteClusterAwareSqlRestT

@After
public void checkSearchContent() throws Exception {
// Some context might linger due to fire and forget nature of scroll cleanup
// Some context might linger due to fire and forget nature of PIT cleanup
assertNoSearchContexts(provisioningClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
expected.put("columns", singletonList(columnInfo(mode, "tz", "integer", JDBCType.INTEGER, 11)));
response = runSql(new StringEntity(sqlRequest, ContentType.APPLICATION_JSON), "", mode);
} else {
assertNotNull(cursor);
response = runSql(
new StringEntity(cursor(cursor).mode(mode).toString(), ContentType.APPLICATION_JSON),
StringUtils.EMPTY,
Expand All @@ -270,16 +271,12 @@ public void testNextPageWithDatetimeAndTimezoneParam() throws IOException {
);
}
expected.put("rows", values);
assertTrue(response.containsKey("cursor") == false || response.get("cursor") != null);
cursor = (String) response.remove("cursor");
assertResponse(expected, response);
assertNotNull(cursor);
}
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(
expected,
runSql(new StringEntity(cursor(cursor).mode(mode).toString(), ContentType.APPLICATION_JSON), StringUtils.EMPTY, mode)
);

assertNull(cursor);

deleteIndex("test_date_timezone");
}
Expand Down Expand Up @@ -1182,7 +1179,7 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri
.toString();

String cursor = null;
for (int i = 0; i < 20; i += 2) {
for (int i = 0; i <= 20; i += 2) {
Tuple<String, String> response;
if (i == 0) {
response = runSqlAsText(StringUtils.EMPTY, new StringEntity(request, ContentType.APPLICATION_JSON), format);
Expand All @@ -1201,25 +1198,17 @@ private void executeQueryWithNextPage(String format, String expectedHeader, Stri
expected.append("---------------+---------------+---------------\n");
}
}
expected.append(String.format(Locale.ROOT, expectedLineFormat, "text" + i, i, i + 5));
expected.append(String.format(Locale.ROOT, expectedLineFormat, "text" + (i + 1), i + 1, i + 6));

cursor = response.v2();
assertEquals(expected.toString(), response.v1());
assertNotNull(cursor);
if (i < 20) {
expected.append(String.format(Locale.ROOT, expectedLineFormat, "text" + i, i, i + 5));
expected.append(String.format(Locale.ROOT, expectedLineFormat, "text" + (i + 1), i + 1, i + 6));
assertEquals(expected.toString(), response.v1());
assertNotNull(cursor);
} else {
assertNull(cursor);
}
}
Map<String, Object> expected = new HashMap<>();
expected.put("rows", emptyList());
assertResponse(
expected,
runSql(new StringEntity(cursor(cursor).toString(), ContentType.APPLICATION_JSON), StringUtils.EMPTY, Mode.PLAIN.toString())
);

Map<String, Object> response = runSql(
new StringEntity(cursor(cursor).toString(), ContentType.APPLICATION_JSON),
"/close",
Mode.PLAIN.toString()
);
assertEquals(true, response.get("succeeded"));

assertEquals(0, getNumberOfSearchContexts(provisioningClient(), "test"));
}
Expand Down

0 comments on commit 68a04a3

Please sign in to comment.