Skip to content
This repository was archived by the owner on Mar 31, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To setup a connection, the driver requires a JDBC connection URL. The connection
| ------------- |-------------| -----|---------|
| user | Connection username. mandatory if `auth` property selects a authentication scheme that mandates a username value | any string | `null` |
| password | Connection password. mandatory if `auth` property selects a authentication scheme that mandates a password value | any string | `null` |
| fetchSize | Cursor page size | positive integer value. Max value is limited by `index.max_result_window` Elasticsearch setting | `0` (for non-paginated response) |
| logOutput | location where driver logs should be emitted | a valid file path | `null` (logs are disabled) |
| logLevel | severity level for which driver logs should be emitted | in order from highest(least logging) to lowest(most logging): OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL | OFF (logs are disabled) |
| auth | authentication mechanism to use | `NONE` (no auth), `BASIC` (HTTP Basic), `AWS_SIGV4` (AWS SIGV4) | `basic` if username and/or password is specified, `NONE` otherwise |
Expand Down
22 changes: 22 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,25 @@ signing {
sign publishing.publications.shadow
}

jacoco {
toolVersion = "0.8.3"
}

jacocoTestReport {
reports {
html.enabled true
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 0.4
}
}
}
}

check.dependsOn jacocoTestCoverageVerification
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ConnectionImpl implements ElasticsearchConnection, JdbcWrapper, Log
private String url;
private String user;
private Logger log;
private int fetchSize;
private boolean open = false;
private Transport transport;
private Protocol protocol;
Expand All @@ -74,6 +75,7 @@ public ConnectionImpl(ConnectionConfig connectionConfig, TransportFactory transp
this.log = log;
this.url = connectionConfig.getUrl();
this.user = connectionConfig.getUser();
this.fetchSize = connectionConfig.getFetchSize();

try {
this.transport = transportFactory.getTransport(connectionConfig, log, getUserAgent());
Expand Down Expand Up @@ -101,6 +103,10 @@ public String getUser() {
return user;
}

public int getFetchSize() {
return fetchSize;
}

@Override
public Statement createStatement() throws SQLException {
log.debug(() -> logEntry("createStatement()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public PreparedStatementImpl(ConnectionImpl connection, String sql, Logger log)
public ResultSet executeQuery() throws SQLException {
log.debug(() -> logEntry("executeQuery()"));
checkOpen();
ResultSet rs = executeQueryX();
ResultSet rs = executeQueryX(getFetchSize());
log.debug(() -> logExit("executeQuery", rs));
return rs;
}

protected ResultSet executeQueryX() throws SQLException {
protected ResultSet executeQueryX(int fetchSize) throws SQLException {
checkParamsFilled();
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
jdbcQueryRequest.setParameters(Arrays.asList(parameters));
return executeQueryRequest(jdbcQueryRequest);
}
Expand Down Expand Up @@ -293,7 +293,7 @@ private int javaToSqlType(Object x) throws SQLException {
public boolean execute() throws SQLException {
log.debug(() -> logEntry("execute()"));
checkOpen();
executeQueryX();
executeQueryX(getFetchSize());
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@
import com.amazon.opendistroforelasticsearch.jdbc.protocol.ColumnDescriptor;
import com.amazon.opendistroforelasticsearch.jdbc.internal.JdbcWrapper;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.QueryResponse;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.InternalServerErrorException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.ResponseException;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JdbcCursorQueryRequest;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocol;
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocolFactory;
import com.amazon.opendistroforelasticsearch.jdbc.transport.http.HttpTransport;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverter;
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverters;
import com.amazon.opendistroforelasticsearch.jdbc.types.UnrecognizedElasticsearchTypeException;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
Expand Down Expand Up @@ -71,18 +78,24 @@ public class ResultSetImpl implements ResultSet, JdbcWrapper, LoggingSource {

private StatementImpl statement;
protected Cursor cursor;
private String cursorId;
private boolean open = false;
private boolean wasNull = false;
private boolean afterLast = false;
private boolean beforeFirst = true;
private Logger log;

public ResultSetImpl(StatementImpl statement, QueryResponse queryResponse, Logger log) throws SQLException {
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), log);
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), queryResponse.getCursor(), log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, Logger log) throws SQLException {
this(statement, columnDescriptors, dataRows, null, log);
}

public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
List<List<Object>> dataRows, String cursorId, Logger log) throws SQLException {
this.statement = statement;
this.log = log;

Expand All @@ -93,12 +106,10 @@ public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> c
.map(ColumnMetaData::new)
.collect(Collectors.toList()));

List<Row> rows = dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
List<Row> rows = getRowsFromDataRows(dataRows);

this.cursor = new Cursor(schema, rows);
this.cursorId = cursorId;
this.open = true;

} catch (UnrecognizedElasticsearchTypeException ex) {
Expand All @@ -112,15 +123,63 @@ public boolean next() throws SQLException {
log.debug(() -> logEntry("next()"));
checkOpen();
boolean next = cursor.next();

if (!next && this.cursorId != null) {
log.debug(() -> logEntry("buildNextPageFromCursorId()"));
buildNextPageFromCursorId();
log.debug(() -> logExit("buildNextPageFromCursorId()"));
next = cursor.next();
}

if (next) {
beforeFirst = false;
} else {
afterLast = true;
}
log.debug(() -> logExit("next", next));
boolean finalNext = next;
log.debug(() -> logExit("next", finalNext));
return next;
}

/**
* TODO: Refactor as suggested https://github.com/opendistro-for-elasticsearch/sql-jdbc/pull/76#discussion_r421571383
*
* This method has side effects. It creates a new Cursor to hold rows from new pages.
* Ideally fetching next set of rows using cursorId should be delegated to Cursor.
* In addition, the cursor should be final.
*
**/
protected void buildNextPageFromCursorId() throws SQLException {
try {
JdbcCursorQueryRequest jdbcCursorQueryRequest = new JdbcCursorQueryRequest(this.cursorId);
JsonCursorHttpProtocolFactory protocolFactory = JsonCursorHttpProtocolFactory.INSTANCE;
ConnectionImpl connection = (ConnectionImpl) statement.getConnection();

JsonCursorHttpProtocol protocol = protocolFactory.getProtocol(null, (HttpTransport) connection.getTransport());
QueryResponse queryResponse = protocol.execute(jdbcCursorQueryRequest);

if (queryResponse.getError() != null) {
throw new InternalServerErrorException(
queryResponse.getError().getReason(),
queryResponse.getError().getType(),
queryResponse.getError().getDetails());
}

cursor = new Cursor(cursor.getSchema(), getRowsFromDataRows(queryResponse.getDatarows()));
cursorId = queryResponse.getCursor();

} catch (ResponseException | IOException ex) {
logAndThrowSQLException(log, new SQLException("Error executing cursor query", ex));
}
}

private List<Row> getRowsFromDataRows(List<List<Object>> dataRows) {
return dataRows
.parallelStream()
.map(Row::new)
.collect(Collectors.toList());
}

@Override
public void close() throws SQLException {
log.debug(() -> logEntry("close()"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,28 @@ public class StatementImpl implements Statement, JdbcWrapper, LoggingSource {

protected ConnectionImpl connection;
protected boolean open = false;
protected int fetchSize;
protected ResultSetImpl resultSet;
protected Logger log;
private boolean closeOnCompletion;

public StatementImpl(ConnectionImpl connection, Logger log) {
this.connection = connection;
this.open = true;
this.fetchSize = connection.getFetchSize();
this.log = log;
}

@Override
public ResultSet executeQuery(String sql) throws SQLException {
log.debug(()-> logEntry("executeQuery (%s)", sql));
ResultSet rs = executeQueryX(sql);
ResultSet rs = executeQueryX(sql, fetchSize);
log.debug(()-> logExit("executeQuery", rs));
return rs;
}

protected ResultSet executeQueryX(String sql) throws SQLException {
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
protected ResultSet executeQueryX(String sql, int fetchSize) throws SQLException {
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
return executeQueryRequest(jdbcQueryRequest);
}

Expand Down Expand Up @@ -167,7 +169,7 @@ public void setCursorName(String name) throws SQLException {
public boolean execute(String sql) throws SQLException {
log.debug(()->logEntry("execute (%s)", sql));
checkOpen();
executeQueryX(sql);
executeQueryX(sql, fetchSize);
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down Expand Up @@ -205,12 +207,12 @@ public int getFetchDirection() throws SQLException {

@Override
public void setFetchSize(int rows) throws SQLException {

fetchSize = rows;
}

@Override
public int getFetchSize() throws SQLException {
return 0;
return fetchSize;
}

@Override
Expand Down Expand Up @@ -275,7 +277,7 @@ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
if (autoGeneratedKeys != Statement.NO_GENERATED_KEYS) {
throw new SQLNonTransientException("Auto generated keys are not supported");
}
executeQueryX(sql);
executeQueryX(sql, fetchSize);
log.debug(() -> logExit("execute", true));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ConnectionConfig {
private String url;
private String host;
private int port;
private int fetchSize;
private String path;
private boolean useSSL;
private int loginTimeout;
Expand All @@ -60,6 +61,7 @@ private ConnectionConfig(Builder builder) {
this.url = builder.getUrl();
this.host = builder.getHostProperty().getValue();
this.port = builder.getPortProperty().getValue();
this.fetchSize = builder.getFetchSizeProperty().getValue();
this.path = builder.getPathProperty().getValue();
this.useSSL = builder.getUseSSLProperty().getValue();

Expand Down Expand Up @@ -106,6 +108,10 @@ public int getPort() {
return port;
}

public int getFetchSize() {
return fetchSize;
}

public String getPath() {
return path;
}
Expand Down Expand Up @@ -192,6 +198,7 @@ public String toString() {
"url='" + url + '\'' +
", host='" + host + '\'' +
", port=" + port +
", fetchSize=" + fetchSize +
", path='" + path + '\'' +
", useSSL=" + useSSL +
", loginTimeout=" + loginTimeout +
Expand Down Expand Up @@ -223,6 +230,7 @@ public static class Builder {

private HostConnectionProperty hostProperty = new HostConnectionProperty();
private PortConnectionProperty portProperty = new PortConnectionProperty();
private FetchSizeProperty fetchSizeProperty = new FetchSizeProperty();
private LoginTimeoutConnectionProperty loginTimeoutProperty = new LoginTimeoutConnectionProperty();
private UseSSLConnectionProperty useSSLProperty = new UseSSLConnectionProperty();
private PathConnectionProperty pathProperty = new PathConnectionProperty();
Expand Down Expand Up @@ -261,6 +269,7 @@ public static class Builder {
ConnectionProperty[] connectionProperties = new ConnectionProperty[]{
hostProperty,
portProperty,
fetchSizeProperty,
loginTimeoutProperty,
useSSLProperty,
pathProperty,
Expand Down Expand Up @@ -302,6 +311,10 @@ public PortConnectionProperty getPortProperty() {
return portProperty;
}

public FetchSizeProperty getFetchSizeProperty() {
return fetchSizeProperty;
}

public LoginTimeoutConnectionProperty getLoginTimeoutProperty() {
return loginTimeoutProperty;
}
Expand Down Expand Up @@ -519,6 +532,11 @@ private void validateConfig() throws ConnectionPropertyException {
// change the default port to use to 443
portProperty.setRawValue(443);
}

if (fetchSizeProperty.getValue() < 0) {
throw new ConnectionPropertyException(fetchSizeProperty.getKey(),
"Cursor fetch size value should be greater or equal to zero");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.amazon.opendistroforelasticsearch.jdbc.config;

public class FetchSizeProperty extends IntConnectionProperty {

public static final String KEY = "fetchSize";

public FetchSizeProperty() {
super(KEY);
}
}
Loading