Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: clear the cursor if nested inner hits are enough to fulfill the query required limits #35398

Merged
merged 13 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,14 @@ public class JdbcCsvSpecIT extends CsvSpecTestCase {
public JdbcCsvSpecIT(String fileName, String groupName, String testName, Integer lineNumber, CsvTestCase testCase) {
super(fileName, groupName, testName, lineNumber, testCase);
}

@Override
protected int fetchSize() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick - to remove the super.fetchSize redundancy, you can make the method a one liner using the ternary conditional:

return fileName.startsWith("nested") && randomBoolean() ? randomIntBetween(1,5) : super.fetchSize();

// using a smaller fetchSize for nested documents' tests to uncover bugs
// similar with https://github.com/elastic/elasticsearch/issues/35176 quicker
if (fileName.startsWith("nested")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of overriding executeJdbcQuery, override fetchSize():

protected int fetchSize() {
        if (fileName.startsWith("nested")) {
           return randomBoolean() ? super.fetchSize() : randomIntBetween(1,5));
        }
  }

return randomBoolean() ? super.fetchSize() : randomIntBetween(1, 5);
}
return super.fetchSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.sql.qa.jdbc;

import org.elasticsearch.client.Request;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -23,12 +26,42 @@
public class FetchSizeTestCase extends JdbcIntegrationTestCase {
@Before
public void createTestIndex() throws IOException {
Request request = new Request("PUT", "/test/doc/_bulk");
Request request = new Request("PUT", "/test");
XContentBuilder createIndex = JsonXContent.contentBuilder().startObject();
createIndex.startObject("mappings");
{
createIndex.startObject("doc");
{
createIndex.startObject("properties");
{
createIndex.startObject("nested").field("type", "nested");
createIndex.startObject("properties");
createIndex.startObject("inner_field").field("type", "integer").endObject();
createIndex.endObject();
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject();
}
createIndex.endObject().endObject();
request.setJsonEntity(Strings.toString(createIndex));
client().performRequest(request);

request = new Request("PUT", "/test/doc/_bulk");
request.addParameter("refresh", "true");
StringBuilder bulk = new StringBuilder();
StringBuilder bulkLine;
for (int i = 0; i < 20; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"test_field\":" + i + "}\n");
bulkLine = new StringBuilder("{\"test_field\":" + i);
bulkLine.append(", \"nested\":[");
// each document will have a nested field with 1 - 5 values
for (int j = 0; j <= i % 5; j++) {
bulkLine.append("{\"inner_field\":" + j + "}" + ((j == i % 5) ? "" : ","));
}
bulkLine.append("]");
bulk.append(bulkLine).append("}\n");
}
request.setJsonEntity(bulk.toString());
client().performRequest(request);
Expand Down Expand Up @@ -92,4 +125,32 @@ public void testAggregation() throws SQLException {
}
}
}

/**
* Test for nested documents.
*/
public void testNestedDocuments() throws Exception {
try (Connection c = esJdbc();
Statement s = c.createStatement()) {
s.setFetchSize(5);
try (ResultSet rs = s.executeQuery("SELECT test_field, nested.* FROM test ORDER BY test_field ASC")) {
assertTrue("Empty result set!", rs.next());
for (int i = 0; i < 20; i++) {
assertEquals(15, rs.getFetchSize());
assertNestedDocuments(rs, i);
}
assertFalse(rs.next());
}
}
assertNoSearchContexts();
}

private void assertNestedDocuments(ResultSet rs, int i) throws SQLException {
for (int j = 0; j <= i % 5; j++) {
assertEquals(i, rs.getInt(1));
assertEquals(j, rs.getInt(2));
// don't check the very last row in the result set
assertTrue("No more entries left after row " + rs.getRow(), (i+j == 23 || rs.next()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ public final void test() throws Throwable {

protected ResultSet executeJdbcQuery(Connection con, String query) throws SQLException {
Statement statement = con.createStatement();
statement.setFetchSize(between(1, 500));
statement.setFetchSize(fetchSize());
return statement.executeQuery(query);
}

protected int fetchSize() {
return between(1, 500);
}

// TODO: use UTC for now until deciding on a strategy for handling date extraction
@Override
protected Properties connectionProperties() {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/sql/qa/src/main/resources/dep_emp.csv
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,7 @@ emp_no,dep_id,from_date,to_date
10097,d008,1990-09-15,9999-01-01
10098,d004,1985-05-13,1989-06-29
10098,d009,1989-06-29,1992-12-11
10098,d008,1992-12-11,1993-05-05
10098,d007,1993-05-05,1994-02-01
10099,d007,1988-10-18,9999-01-01
10100,d003,1987-09-21,9999-01-01
79 changes: 77 additions & 2 deletions x-pack/plugin/sql/qa/src/main/resources/nested.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Mayuko | 12
selectWithScalarOnNested
SELECT first_name f, last_name l, YEAR(dep.from_date) start FROM test_emp WHERE dep.dep_name = 'Production' AND languages > 1 ORDER BY start LIMIT 5;

f:s | l:s | start:i
f:s | l:s | start:i

Sreekrishna |Servieres |1985
Zhongwei |Rosen |1986
Expand Down Expand Up @@ -137,7 +137,7 @@ d003 |Azuma
d002 |Baek
d003 |Baek
d004 |Bamford
;
;

selectNestedFieldLast
SELECT first_name, dep.dep_id FROM test_emp ORDER BY first_name LIMIT 5;
Expand Down Expand Up @@ -222,3 +222,78 @@ Anneke |d005 |Development |1990-08-05T00:00:00.000Z|9999-01
Anoosh |d005 |Development |1991-08-30T00:00:00.000Z|9999-01-01T00:00:00.000Z|Peyn
Arumugam |d008 |Research |1987-04-18T00:00:00.000Z|1997-11-08T00:00:00.000Z|Ossenbruggen
;

//
// Nested documents tests more targetted for JdbcCsvNestedDocsIT class (with specific fetch_size value)
//

// employee 10098 has 4 departments

selectNestedFieldWithFourInnerHitsAndLimitOne
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 1;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitTwo
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 2;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitThree
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 3;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitFour
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 4;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
;

selectNestedFieldWithFourInnerHitsAndLimitFive
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 LIMIT 5;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
;

selectNestedFieldFromTwoDocumentsWithFourInnerHitsAndLimitFive
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10098 OR emp_no=10099 LIMIT 5;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d004 |Production |Sreekrishna |10098
d009 |Customer Service|Sreekrishna |10098
d008 |Research |Sreekrishna |10098
d007 |Sales |Sreekrishna |10098
d007 |Sales |Valter |10099
;

selectNestedFieldFromDocumentWithOneInnerHitAndLimitOne
SELECT dep.dep_id, dep.dep_name, first_name, emp_no FROM test_emp WHERE emp_no=10099 LIMIT 1;

dep.dep_id:s | dep.dep_name:s | first_name:s | emp_no:i
---------------+----------------+---------------+---------------
d007 |Sales |Valter |10099
;
Original file line number Diff line number Diff line change
Expand Up @@ -318,19 +318,20 @@ protected void handleResponse(SearchResponse response, ActionListener<SchemaRowS
// there are some results
if (hits.length > 0) {
String scrollId = response.getScrollId();

SchemaSearchHitRowSet hitRowSet = new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId);

// if there's an id, try to setup next scroll
if (scrollId != null &&
// is all the content already retrieved?
(Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
// or maybe the limit has been reached
|| (hits.length >= query.limit() && query.limit() > -1))) {
(Boolean.TRUE.equals(response.isTerminatedEarly())
|| response.getHits().getTotalHits() == hits.length
|| hitRowSet.isLimitReached())) {
// if so, clear the scroll
clear(response.getScrollId(), ActionListener.wrap(
succeeded -> listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), null)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reuse the hitRowSet above instead of creating a new one.

listener::onFailure));
} else {
listener.onResponse(new SchemaSearchHitRowSet(schema, exts, hits, query.limit(), scrollId));
listener.onResponse(hitRowSet);
}
}
// no hits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
class SearchHitRowSet extends AbstractRowSet {
private final SearchHit[] hits;
private final Cursor cursor;
private final String scrollId;
private final List<HitExtractor> extractors;
private final Set<String> innerHits = new LinkedHashSet<>();
private final String innerHit;
Expand All @@ -35,7 +34,6 @@ class SearchHitRowSet extends AbstractRowSet {
SearchHitRowSet(List<HitExtractor> exts, SearchHit[] hits, int limit, String scrollId) {

this.hits = hits;
this.scrollId = scrollId;
this.extractors = exts;

// Since the results might contain nested docs, the iteration is similar to that of Aggregation
Expand Down Expand Up @@ -91,6 +89,10 @@ class SearchHitRowSet extends AbstractRowSet {
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method can have default visibility as it's not used anywhere else.

protected boolean isLimitReached() {
return cursor == Cursor.EMPTY;
}

@Override
public int columnCount() {
Expand Down Expand Up @@ -166,10 +168,6 @@ public int size() {
return size;
}

public String scrollId() {
return scrollId;
}

@Override
public Cursor nextPageCursor() {
return cursor;
Expand Down