Skip to content

Commit

Permalink
SQL: clear the cursor if nested inner hits are enough to fulfill the …
Browse files Browse the repository at this point in the history
…query required limits (#35398)
  • Loading branch information
astefan committed Nov 15, 2018
1 parent fd7f710 commit 03872f4
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 16 deletions.
Expand Up @@ -318,19 +318,20 @@ protected void handleResponse(SearchResponse response, ActionListener<SchemaRowS
// there are some results
if (hits.length > 0) {
String scrollId = response.getScrollId();

SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId);

// if there's an id, try to setup next scroll
if (scrollId != null &&
// is all the content already retrieved?
(Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
// or maybe the limit has been reached
|| (hits.length >= query.limit() && query.limit() > -1))) {
(Boolean.TRUE.equals(response.isTerminatedEarly())
|| response.getHits().getTotalHits() == hits.length
|| hitRowSet.isLimitReached())) {
// if so, clear the scroll
clear(response.getScrollId(), ActionListener.wrap(
succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), null)),
listener::onFailure));
} else {
listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId));
listener.onResponse(hitRowSet);
}
}
// no hits
Expand Down
Expand Up @@ -23,7 +23,6 @@
class SearchHitRowSet extends AbstractRowSet {
private final SearchHit[] hits;
private final Cursor cursor;
private final String scrollId;
private final List<HitExtractor> extractors;
private final Set<String> innerHits = new LinkedHashSet<>();
private final String innerHit;
Expand All @@ -35,7 +34,6 @@ class SearchHitRowSet extends AbstractRowSet {
SearchHitRowSet(List<HitExtractor> exts, SearchHit[] hits, int limit, String scrollId) {

this.hits = hits;
this.scrollId = scrollId;
this.extractors = exts;

// Since the results might contain nested docs, the iteration is similar to that of Aggregation
Expand Down Expand Up @@ -91,6 +89,10 @@ class SearchHitRowSet extends AbstractRowSet {
}
}
}

protected boolean isLimitReached() {
return cursor == Cursor.EMPTY;
}

@Override
public int columnCount() {
Expand Down Expand Up @@ -166,10 +168,6 @@ public int size() {
return size;
}

public String scrollId() {
return scrollId;
}

@Override
public Cursor nextPageCursor() {
return cursor;
Expand Down
Expand Up @@ -14,4 +14,11 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase {
public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) {
super(fileName, groupName, testName, lineNumber, testCase);
}

@Override
protected int fetchSize() {
// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();
}
}
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.qa.sql.jdbc;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -23,12 +26,42 @@
public class FetchSizeTestCase extends JdbcIntegrationTestCase {
@Before
public void createTestIndex() throws IOException {
Request request = new Request("PUT", "/test/doc/_bulk");
Request request = new Request("PUT", "/test");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("doc");
{
createIndex.startObject("properties");
{
createIndex.startObject("nested").field("type", "nested");
createIndex.startObject("properties");
createIndex.startObject("inner_field").field("type", "integer").endObject();
createIndex.endObject();
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test/doc/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
StringBuilder bulkLine;
for (int i = 0; i < 20; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"test_field\":" + i + "}\n");
bulkLine = new StringBuilder("{\"test_field\":" + i);
bulkLine.append(", \"nested\":[");
// each document will have a nested field with 1 - 5 values
for (int j = 0; j <= i % 5; j++) {
bulkLine.append("{\"inner_field\":" + j + "}" + ((j == i % 5) ? "" : ","));
}
bulkLine.append("]");
bulk.append(bulkLine).append("}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);
Expand Down Expand Up @@ -92,4 +125,32 @@ public void testAggregation() throws SQLException {
}
}
}

/**
* Test for nested documents.
*/
public void testNestedDocuments() throws Exception {
try (Connection c = esJdbc();
Statement s = c.createStatement()) {
s.setFetchSize(5);
try (ResultSet rs = s.executeQuery("SELECT test_field, nested.* FROM test ORDER BY test_field ASC")) {
assertTrue("Empty result set!", rs.next());
for (int i = 0; i < 20; i++) {
assertEquals(15, rs.getFetchSize());
assertNestedDocuments(rs, i);
}
assertFalse(rs.next());
}
}
assertNoSearchContexts();
}

private void assertNestedDocuments(ResultSet rs, int i) throws SQLException {
for (int j = 0; j <= i % 5; j++) {
assertEquals(i, rs.getInt(1));
assertEquals(j, rs.getInt(2));
// don't check the very last row in the result set
assertTrue("No more entries left after row " + rs.getRow(), (i+j == 23 || rs.next()));
}
}
}
Expand Up @@ -103,10 +103,14 @@ public final void test() throws Throwable {

protected ResultSet executeJdbcQuery(Connection con, String query) throws SQLException {
Statement statement = con.createStatement();
statement.setFetchSize(between(1, 500));
statement.setFetchSize(fetchSize());
return statement.executeQuery(query);
}

protected int fetchSize() {
return between(1, 500);
}

// TODO: use UTC for now until deciding on a strategy for handling date extraction
@Override
protected Properties connectionProperties() {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/qa/sql/src/main/resources/dep_emp.csv
Expand Up @@ -107,5 +107,7 @@ emp_no,dep_id,from_date,to_date
10097,d008,1990-09-15,9999-01-01
10098,d004,1985-05-13,1989-06-29
10098,d009,1989-06-29,1992-12-11
10098,d008,1992-12-11,1993-05-05
10098,d007,1993-05-05,1994-02-01
10099,d007,1988-10-18,9999-01-01
10100,d003,1987-09-21,9999-01-01
79 changes: 77 additions & 2 deletions x-pack/qa/sql/src/main/resources/nested.csv-spec
Expand Up @@ -102,7 +102,7 @@ Mayuko | 12
selectWithScalarOnNested
SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM test_emp WHERE dep.dep_name = 'Production' AND languages > 1 ORDER BY start LIMIT 5;

f:s | l:s | start:i
f:s | l:s | start:i

Sreekrishna |Servieres |1985
Zhongwei |Rosen |1986
Expand Down Expand Up @@ -137,7 +137,7 @@ d003 |Azuma
d002 |Baek
d003 |Baek
d004 |Bamford
;
;

selectNestedFieldLast
SELECT first_name, dep.dep_id FROM test_emp ORDER BY first_name LIMIT 5;
Expand Down Expand Up @@ -222,3 +222,78 @@ Anneke |d005 |Development |1990-08-05T00:00:00.000Z|9999-01
Anoosh |d005 |Development |1991-08-30T00:00:00.000Z|9999-01-01T00:00:00.000Z|Peyn
Arumugam |d008 |Research |1987-04-18T00:00:00.000Z|1997-11-08T00:00:00.000Z|Ossenbruggen
;

//
// Nested documents tests more targetted for JdbcCsvNestedDocsIT class (with specific fetch_size value)
//

// employee 10098 has 4 departments

selectNestedFieldWithFourInnerHitsAndLimitOne
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 1;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitTwo
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 2;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitThree
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 3;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitFour
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 4;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitFive
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 5;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
;

selectNestedFieldFromTwoDocumentsWithFourInnerHitsAndLimitFive
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 OR emp_no=10099 LIMIT 5;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
d007 |Sales |Valter |10099
;

selectNestedFieldFromDocumentWithOneInnerHitAndLimitOne
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10099 LIMIT 1;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d007 |Sales |Valter |10099
;

0 comments on commit 03872f4

Please sign in to comment.