diff --git a/.gitignore b/.gitignore index e36b60b..c58a5b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ out/ -.idea/checkstyle-idea.xml +.idea diff --git a/.gitignore.save b/.gitignore.save new file mode 100644 index 0000000..c58a5b6 --- /dev/null +++ b/.gitignore.save @@ -0,0 +1,2 @@ +out/ +.idea diff --git a/README.md b/README.md index 7bc34ac..65f5354 100644 --- a/README.md +++ b/README.md @@ -14,25 +14,48 @@ Following actions are inside: ``LOOKUP BY PRIMARY KEY`` - this action will execute select query from specified table, as criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"). The action returns only one result (a primary key is unique). +``UPSERT BY PRIMARY KEY`` - this action will execute select command from specified table, as search criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"), and execute insert command by PRIMARY KEY with specified field, if result does not found, else - action will execute update command by PRIMARY KEY with specified field. The action returns only one result row (a primary key is unique). + ``DELETE BY PRIMARY KEY`` - this action will execute delete query from specified table, as criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"). The action returns an integer value that indicates the number of rows affected, the returned value can be 0 or 1 (a primary key is unique). ### How works ### Requirements Before you can deploy any code into elastic.io **you must be a registered elastic.io platform user**. Please see our home page at [http://www.elastic.io](http://www.elastic.io) to learn how. #### Environment variables -For unit-testing +For unit-testing is needed to specify following environment variables: +1. Connection to MSSQL: + - ``CONN_USER_MSSQL`` - user login + - ``CONN_PASSWORD_MSSQL`` - user password + - ``CONN_DBNAME_MSSQL`` - DataBase name + - ``CONN_HOST_MSSQL`` - DataBase host + - ``CONN_PORT_MSSQL`` - DataBase port +2. Connection to MySQL: + - ``CONN_USER_MYSQL`` - user login + - ``CONN_PASSWORD_MYSQL`` - user password + - ``CONN_DBNAME_MYSQL`` - DataBase name + - ``CONN_HOST_MYSQL`` - DataBase host + - ``CONN_PORT_MYSQL`` - DataBase port +3. Connection to Oracle: + - ``CONN_USER_ORACLE`` - user login + - ``CONN_PASSWORD_ORACLE`` - user password + - ``CONN_DBNAME_ORACLE`` - DataBase name + - ``CONN_HOST_ORACLE`` - DataBase host + - ``CONN_PORT_ORACLE`` - DataBase port +4. Connection to PostgreSQL: + - ``CONN_USER_POSTGRESQL`` - user login + - ``CONN_PASSWORD_POSTGRESQL`` - user password + - ``CONN_DBNAME_POSTGRESQL`` - DataBase name + - ``CONN_HOST_POSTGRESQL`` - DataBase host + - ``CONN_PORT_POSTGRESQL`` - DataBase port #### Others ## Credentials You may use following properties to configure a connection: ![image](https://user-images.githubusercontent.com/40201204/43577550-ce99efe6-9654-11e8-87ed-f3e0839d618a.png) You can add the authorisation methods during the integration flow design or by going to your Settings > Security credentials > REST client and adding there. ### DB Engine +You are able to choose one of existing database types: ![image](https://user-images.githubusercontent.com/40201204/43577772-6f85bdea-9655-11e8-96e1-368493a36c9d.png) -You are able to choose one of existing database types -- ``MySQL`` - compatible with MySQL Server 5.5, 5.6, 5.7 and 8.0. -- ``PostgreSQL`` - compatible with PostgreSQL 8.2 and higher -- ``Oracle`` - compatible with Oracle Database 8.1.7 - 12.1.0.2 -- ``MSSQL`` - compatible with Microsoft SQL Server 2008 R2 and higher + ### Connection URI In the Connection URI field please provide hostname of the server, e.g. ``acme.com`` ### Connection port @@ -120,12 +143,46 @@ Component supports dynamic incoming metadata - as soon as your query is in place ### LOOKUP BY PRIMARY KEY ![image](https://user-images.githubusercontent.com/40201204/43592505-5b6bbfe8-967e-11e8-845e-2ce8ac707357.png) + The action will execute select query from a ``Table`` dropdown field, as criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"). The action returns only one result (a primary key is unique). Checkbox ``Don't throw Error on an Empty Result`` allows to emit an empty response, otherwise you will get an error on empty response. #### Input fields description ![image](https://user-images.githubusercontent.com/40201204/43644579-f593d1c8-9737-11e8-9b97-ee9e575a19f7.png) As an input metadata you will get a Primary Key field to provide the data inside as a clause value. +### UPSERT BY PRIMARY KEY +The action will execute ``SELECT`` command from a ``Tables`` dropdown field, as search criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"), and execute ``INSERT`` command by PRIMARY KEY with specified field, if result does not found, else - action will execute ``UPDATE`` command by PRIMARY KEY with specified field. The action returns only one result row (a primary key is unique). +1. Find and select jdbc-component in the component repository +![image](https://user-images.githubusercontent.com/16806832/44981615-c70a9d80-af7b-11e8-8055-3b553abe8212.png) + +2. Create new or select existing credentials +![image](https://user-images.githubusercontent.com/16806832/44981652-e86b8980-af7b-11e8-897e-04d1fc9a93cf.png) + +3. Select action "Upsert Row By Primary Key" from list +![image](https://user-images.githubusercontent.com/16806832/44981700-0d5ffc80-af7c-11e8-9ac3-aedb16e1d788.png) + +4. Select table from ``Table`` dropdown list +![image](https://user-images.githubusercontent.com/16806832/44981754-38e2e700-af7c-11e8-87d3-f029a7fec8fa.png) + +5. Specify input data (field with red asterisk is Primary key), and click "Continue" +![image](https://user-images.githubusercontent.com/16806832/44981854-83fcfa00-af7c-11e8-9ef2-8c06e77fed1e.png) + +6. Retrieving sample +![image](https://user-images.githubusercontent.com/16806832/44983059-86f9e980-af80-11e8-8178-77e463488c7a.png) + +7. Retrieve sample result +![image](https://user-images.githubusercontent.com/16806832/44982952-2ec2e780-af80-11e8-98b1-58c3adbc15b9.png) + +8. Click "Continue" +![image](https://user-images.githubusercontent.com/16806832/44983101-b0b31080-af80-11e8-82d8-0e70e4b4ff97.png) + +9. Finish component configuration +![image](https://user-images.githubusercontent.com/16806832/44983365-90378600-af81-11e8-9be4-4dbb39af0fdc.png) + +#### Input fields description +As an input metadata you will get all fields of selected table. [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY") is required field (will mark as asterisk) and other input fields are optional. +![image](https://user-images.githubusercontent.com/16806832/44397461-1a76f780-a549-11e8-8247-9a6f9aa3f3b4.png) + ### DELETE BY PRIMARY KEY ![image](https://user-images.githubusercontent.com/40201204/43592505-5b6bbfe8-967e-11e8-845e-2ce8ac707357.png) The action will execute delete query from a ``Table`` dropdown field, as criteria can be used only [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY"). The action returns count of affected rows. @@ -134,6 +191,17 @@ Checkbox ``Don't throw Error on an Empty Result`` allows to emit an empty respon ![image](https://user-images.githubusercontent.com/40201204/43644579-f593d1c8-9737-11e8-9b97-ee9e575a19f7.png) As an input metadata you will get a Primary Key field to provide the data inside as a clause value. +## Current limitations +1. Only tables with one [PRIMARY KEY](https://en.wikipedia.org/wiki/Primary_key "PRIMARY KEY") is supported. You will see the message ``Table has not Primary Key. Should be one Primary Key +``, if the selected table doesn't have a primary key. Also, you will see the message ``Composite Primary Key is not supported +``, if the selected table has composite primary key. +2. Only following versions of database types are supported: +- ``MySQL`` - compatible with MySQL Server 5.5, 5.6, 5.7 and 8.0. +- ``PostgreSQL`` - compatible with PostgreSQL 8.2 and higher +- ``Oracle`` - compatible with Oracle Database 8.1.7 - 12.1.0.2 +- ``MSSQL`` - compatible with Microsoft SQL Server 2008 R2 and higher +3. The current implementation of the action ``Upsert By Primary Key`` doesn't mark non-nullable fields as required fields at a dynamic metadata. In case of updating such fields with an empty value you will get SQL Exception ``Cannot insert the value NULL into...``. You should manually fill in all non-nullable fields with previous data, if you want to update part of columns in a row, even if data in that fields doesn't change. + ## Known issues No known issues are there yet. diff --git a/component.json b/component.json index cd8a0f2..d6292eb 100755 --- a/component.json +++ b/component.json @@ -124,6 +124,21 @@ }, "dynamicMetadata": "io.elastic.jdbc.PrimaryColumnNamesProvider" }, + "upsertRowByPrimaryKey": { + "main": "io.elastic.jdbc.actions.UpsertRowByPrimaryKey", + "title": "Upsert Row By Primary Key", + "description": "Executes upsert by primary key", + "fields": { + "tableName": { + "viewClass": "SelectView", + "prompt": "Select a Table", + "label": "Table", + "required": true, + "model": "io.elastic.jdbc.TableNameProvider" + } + }, + "dynamicMetadata": "io.elastic.jdbc.ColumnNamesWithPrimaryKeyProvider" + }, "deleteRowByPrimaryKey": { "main": "io.elastic.jdbc.actions.DeleteRowByPrimaryKey", "title": "Delete Row By Primary Key", diff --git a/src/main/java/io/elastic/jdbc/ColumnNamesProvider.java b/src/main/java/io/elastic/jdbc/ColumnNamesProvider.java index 830b7b0..4b5252a 100755 --- a/src/main/java/io/elastic/jdbc/ColumnNamesProvider.java +++ b/src/main/java/io/elastic/jdbc/ColumnNamesProvider.java @@ -17,7 +17,7 @@ public class ColumnNamesProvider implements DynamicMetadataProvider, SelectModelProvider { - private static final Logger logger = LoggerFactory.getLogger(ColumnNamesProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ColumnNamesProvider.class); @Override public JsonObject getSelectModel(JsonObject configuration) { @@ -57,8 +57,8 @@ public JsonObject getColumns(JsonObject configuration) { ResultSet rs = null; String schemaName = null; boolean isEmpty = true; - Boolean isOracle = (configuration.getString("dbEngine").equals("oracle")) ? true : false; - Boolean isMssql = (configuration.getString("dbEngine").equals("mssql")) ? true : false; + Boolean isOracle = configuration.getString("dbEngine").equals("oracle"); + Boolean isMssql = configuration.getString("dbEngine").equals("mssql"); try { connection = Utils.getConnection(configuration); DatabaseMetaData dbMetaData = connection.getMetaData(); @@ -71,14 +71,14 @@ public JsonObject getColumns(JsonObject configuration) { while (rs.next()) { JsonObjectBuilder field = Json.createObjectBuilder(); String name = rs.getString("COLUMN_NAME"); - Boolean isRequired = false; + Boolean isRequired; + Integer isNullable = (rs.getObject("NULLABLE") != null) ? rs.getInt("NULLABLE") : 1; if (isMssql) { String isAutoincrement = (rs.getString("IS_AUTOINCREMENT") != null) ? rs.getString("IS_AUTOINCREMENT") : ""; - Integer isNullable = (rs.getObject("NULLABLE") != null) ? rs.getInt("NULLABLE") : 1; isRequired = isNullable == 0 && !isAutoincrement.equals("YES"); } else { - isRequired = false; + isRequired = isNullable == 0; } field.add("required", isRequired) .add("title", name) @@ -97,14 +97,14 @@ public JsonObject getColumns(JsonObject configuration) { try { rs.close(); } catch (SQLException e) { - logger.error("Failed to close result set {}", e.toString()); + LOGGER.error("Failed to close result set {}", e); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { - logger.error("Failed to close connection {}", e.toString()); + LOGGER.error("Failed to close connection {}", e); } } } diff --git a/src/main/java/io/elastic/jdbc/ColumnNamesWithPrimaryKeyProvider.java b/src/main/java/io/elastic/jdbc/ColumnNamesWithPrimaryKeyProvider.java new file mode 100644 index 0000000..2fc0045 --- /dev/null +++ b/src/main/java/io/elastic/jdbc/ColumnNamesWithPrimaryKeyProvider.java @@ -0,0 +1,139 @@ +package io.elastic.jdbc; + +import io.elastic.api.DynamicMetadataProvider; +import io.elastic.api.SelectModelProvider; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Map; +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ColumnNamesWithPrimaryKeyProvider implements DynamicMetadataProvider, + SelectModelProvider { + + private static final Logger LOGGER = LoggerFactory + .getLogger(ColumnNamesWithPrimaryKeyProvider.class); + + @Override + public JsonObject getSelectModel(JsonObject configuration) { + JsonObjectBuilder result = Json.createObjectBuilder(); + JsonObject properties = getColumns(configuration); + for (Map.Entry entry : properties.entrySet()) { + result.add(entry.getKey(), entry.getKey()); + } + return result.build(); + } + + /** + * Returns Columns list as metadata + */ + + @Override + public JsonObject getMetaModel(JsonObject configuration) { + JsonObjectBuilder result = Json.createObjectBuilder(); + JsonObjectBuilder inMetadata = Json.createObjectBuilder(); + JsonObjectBuilder outMetadata = Json.createObjectBuilder(); + JsonObject properties = getColumns(configuration); + inMetadata.add("type", "object").add("properties", properties); + outMetadata.add("type", "object").add("properties", properties); + result.add("out", outMetadata.build()).add("in", inMetadata.build()); + return result.build(); + } + + public JsonObject getColumns(JsonObject configuration) { + if (configuration.getString("tableName") == null || configuration.getString("tableName") + .isEmpty()) { + throw new RuntimeException("Table name is required"); + } + String tableName = configuration.getString("tableName"); + JsonObjectBuilder properties = Json.createObjectBuilder(); + Connection connection = null; + ResultSet rs = null; + ResultSet rsPrimaryKeys = null; + String schemaName = null; + boolean isEmpty = true; + Boolean isOracle = configuration.getString("dbEngine").equals("oracle"); + try { + connection = Utils.getConnection(configuration); + DatabaseMetaData dbMetaData = connection.getMetaData(); + if (tableName.contains(".")) { + schemaName = tableName.split("\\.")[0]; + tableName = tableName.split("\\.")[1]; + } + rsPrimaryKeys = dbMetaData + .getPrimaryKeys(null, ((isOracle && !schemaName.isEmpty()) ? schemaName : null), + tableName); + rs = dbMetaData.getColumns(null, schemaName, tableName, "%"); + while (rs.next()) { + JsonObjectBuilder field = Json.createObjectBuilder(); + String name = rs.getString("COLUMN_NAME"); + Boolean isRequired = false; + while (rsPrimaryKeys.next()) { + if (rsPrimaryKeys.getString("COLUMN_NAME").equals(name)) { + isRequired = true; + break; + } + } + field.add("required", isRequired) + .add("title", name) + .add("type", convertType(rs.getInt("DATA_TYPE"))); + properties.add(name, field.build()); + isEmpty = false; + } + if (isEmpty) { + properties.add("empty dataset", "no columns"); + } + + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOGGER.error("Failed to close result set {}", e); + } + } + if (rsPrimaryKeys != null) { + try { + rsPrimaryKeys.close(); + } catch (SQLException e) { + LOGGER.error("Failed to close result set {}", e); + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOGGER.error("Failed to close connection {}", e); + } + } + } + return properties.build(); + } + + /** + * Converts JDBC column type name to js type according to http://db.apache.org/ojb/docu/guides/jdbc-types.html + * + * @param sqlType JDBC column type + * @url http://db.apache.org/ojb/docu/guides/jdbc-types.html + */ + private String convertType(Integer sqlType) { + if (sqlType == Types.NUMERIC || sqlType == Types.DECIMAL || sqlType == Types.TINYINT + || sqlType == Types.SMALLINT || sqlType == Types.INTEGER || sqlType == Types.BIGINT + || sqlType == Types.REAL || sqlType == Types.FLOAT || sqlType == Types.DOUBLE) { + return "number"; + } + if (sqlType == Types.BIT || sqlType == Types.BOOLEAN) { + return "boolean"; + } + return "string"; + } +} diff --git a/src/main/java/io/elastic/jdbc/JdbcCredentialsVerifier.java b/src/main/java/io/elastic/jdbc/JdbcCredentialsVerifier.java index 162e041..e446246 100644 --- a/src/main/java/io/elastic/jdbc/JdbcCredentialsVerifier.java +++ b/src/main/java/io/elastic/jdbc/JdbcCredentialsVerifier.java @@ -10,27 +10,27 @@ public class JdbcCredentialsVerifier implements CredentialsVerifier { - private static final Logger logger = LoggerFactory.getLogger(JdbcCredentialsVerifier.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcCredentialsVerifier.class); @Override public void verify(JsonObject configuration) throws InvalidCredentialsException { - logger.info("About to connect to database using given credentials"); + LOGGER.info("About to connect to database using given credentials"); Connection connection = null; try { connection = Utils.getConnection(configuration); - logger.info("Successfully connected to database. Credentials verified."); + LOGGER.info("Successfully connected to database. Credentials verified."); } catch (Exception e) { throw new InvalidCredentialsException("Failed to connect to database", e); } finally { if (connection != null) { - logger.info("Closing database connection"); + LOGGER.info("Closing database connection"); try { connection.close(); } catch (SQLException e) { - logger.error("Failed to closed database connection", e); + LOGGER.error("Failed to closed database connection", e); } } } diff --git a/src/main/java/io/elastic/jdbc/PrimaryColumnNamesProvider.java b/src/main/java/io/elastic/jdbc/PrimaryColumnNamesProvider.java index a61e717..01e86c0 100644 --- a/src/main/java/io/elastic/jdbc/PrimaryColumnNamesProvider.java +++ b/src/main/java/io/elastic/jdbc/PrimaryColumnNamesProvider.java @@ -19,7 +19,7 @@ public class PrimaryColumnNamesProvider implements DynamicMetadataProvider, SelectModelProvider { - private static final Logger logger = LoggerFactory.getLogger(PrimaryColumnNamesProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PrimaryColumnNamesProvider.class); public JsonObject getSelectModel(JsonObject configuration) { JsonObject result = Json.createObjectBuilder().build(); @@ -57,8 +57,8 @@ public JsonObject getPrimaryColumns(JsonObject configuration) { ResultSet rs = null; String schemaName = null; Boolean isEmpty = true; - Boolean isOracle = (configuration.getString("dbEngine").equals("oracle")) ? true : false; - Boolean isMssql = (configuration.getString("dbEngine").equals("mssql")) ? true : false; + Boolean isOracle = configuration.getString("dbEngine").equals("oracle"); + Boolean isMssql = configuration.getString("dbEngine").equals("mssql"); List primaryKeys = new ArrayList(); try { connection = Utils.getConnection(configuration); @@ -74,7 +74,7 @@ public JsonObject getPrimaryColumns(JsonObject configuration) { tableName); while (rs.next()) { primaryKeys.add(rs.getString("COLUMN_NAME")); - logger.info("Primary Key: {}", rs.getString("COLUMN_NAME")); + LOGGER.info("Primary Key: {}", rs.getString("COLUMN_NAME")); } rs = dbMetaData .getColumns(null, ((isOracle && !schemaName.isEmpty()) ? schemaName : null), tableName, @@ -100,7 +100,7 @@ public JsonObject getPrimaryColumns(JsonObject configuration) { } } if (isEmpty) { - logger.info("Empty PK list - no primary keys"); + LOGGER.info("Empty PK list - no primary keys"); throw new IllegalStateException("No Primary Keys"); } @@ -111,14 +111,14 @@ public JsonObject getPrimaryColumns(JsonObject configuration) { try { rs.close(); } catch (SQLException e) { - logger.error("Failed to close result set", e.toString()); + LOGGER.error("Failed to close result set", e); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { - logger.error("Failed to close connection", e.toString()); + LOGGER.error("Failed to close connection", e); } } } diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java index 7f2dd33..f849824 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java @@ -4,36 +4,19 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class MSSQL extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), - entry.getValue().toString().replace("\"", "")); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "WITH Results_CTE AS" + "(" + @@ -47,14 +30,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { " FROM Results_CTE" + " WHERE RowNum > ?" + " AND RowNum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber + skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "WITH Results_CTE AS" + "(" + @@ -68,13 +74,7 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ " FROM Results_CTE" + " WHERE RowNum > ?" + " AND RowNum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), entry.getValue().toString()); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -82,21 +82,18 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } - public boolean executeRecordExists(Connection connection) throws SQLException { + public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -117,13 +114,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -139,13 +137,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, idValue); - stmt.execute(); } -} \ No newline at end of file + +} diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java index be3e213..c8e4441 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java @@ -4,37 +4,18 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MySQL extends Query { - private static final Logger logger = LoggerFactory.getLogger(MySQL.class); - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) throws SQLException { - StringBuilder sql = new StringBuilder(sqlQuery); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), (entry.getValue() != null) ? entry.getValue().toString().replace("\"", "") : null); - i++; - } - return stmt.executeQuery(); - } - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) throws SQLException { - StringBuilder sql = new StringBuilder(sqlQuery); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - if(pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); StringBuilder sql = new StringBuilder("SELECT * FROM "); sql.append(tableName); @@ -46,14 +27,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { } sql.append(" ASC LIMIT ? OFFSET ?"); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, countNumber); - stmt.setInt(3, skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql.toString())) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, countNumber); + stmt.setInt(3, skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); StringBuilder sql = new StringBuilder("SELECT * FROM "); sql.append(tableName); @@ -62,35 +66,25 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ sql.append(" = ?"); sql.append(" ORDER BY ").append(lookupField); sql.append(" ASC LIMIT ? OFFSET ?"); - - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), entry.getValue().toString()); - } - stmt.setInt(2, countNumber); - stmt.setInt(3, skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql.toString(), countNumber, skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } - public boolean executeRecordExists(Connection connection) throws SQLException { + public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -111,13 +105,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (String key : body.keySet()) { + Utils.setStatementParam(stmt, i, key, body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -133,14 +128,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (String key : body.keySet()) { + Utils.setStatementParam(stmt, i, key, body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, idValue); - stmt.execute(); } -} \ No newline at end of file +} diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java index 5c0609d..0027561 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java @@ -4,86 +4,81 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class Oracle extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), - entry.getValue().toString().replace("\"", "")); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "SELECT * FROM " + " (SELECT b.*, rank() over (order by " + pollingField + ") as rnk FROM " + tableName + " b) WHERE " + pollingField + " > ?" + " AND rnk BETWEEN ? AND ?" + " ORDER BY " + pollingField; - PreparedStatement stmt = connection.prepareStatement(sql); - - /* data types mapping https://docs.oracle.com/cd/B19306_01/java.102/b14188/datamap.htm */ - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + /* data types mapping https://docs.oracle.com/cd/B19306_01/java.102/b14188/datamap.htm */ + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT * FROM " + "(SELECT b.*, rank() OVER (ORDER BY " + lookupField + ") AS rnk FROM " + tableName + " b) WHERE " + lookupField + " = ? " + "AND rnk BETWEEN ? AND ? " + "ORDER BY " + lookupField; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), entry.getValue().toString()); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } - public boolean executeRecordExists(Connection connection) throws SQLException { + public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -104,13 +99,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -126,14 +122,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, idValue); - stmt.execute(); } } diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java index b26e297..c666d95 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java @@ -4,36 +4,18 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class PostgreSQL extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), - entry.getValue().toString().replace("\"", "")); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "WITH results_cte AS" + "(" + @@ -47,14 +29,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { " FROM results_cte" + " WHERE rownum > ?" + " AND rownum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber + skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "WITH results_cte AS" + "(" + @@ -68,14 +73,7 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ " FROM results_cte" + " WHERE rownum > ?" + " AND rownum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - //stmt.setString(1, lookupValue); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), entry.getValue().toString()); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -83,23 +81,20 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), entry.getValue().toString()); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, 1, entry.getKey(), body); + } + return stmt.executeUpdate(); } - return stmt.executeUpdate(); } - public boolean executeRecordExists(Connection connection) throws SQLException { + public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -120,13 +115,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -142,13 +138,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), entry.getValue().toString()); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, idValue); - stmt.execute(); } -} \ No newline at end of file + +} diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java index 5a33708..b634631 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java @@ -2,10 +2,18 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; public abstract class Query { @@ -15,9 +23,23 @@ public abstract class Query { protected String orderField = null; protected String pollingField = null; protected Timestamp pollingValue = null; + protected Timestamp maxPollingValue = null; protected String lookupField = null; protected String lookupValue = null; + public static String preProcessSelect(String sqlQuery) { + sqlQuery = sqlQuery.trim(); + if (!isSelect(sqlQuery)) { + throw new RuntimeException("Unresolvable SELECT query"); + } + return sqlQuery.replaceAll(Utils.VARS_REGEXP, "?"); + } + + public static boolean isSelect(String sqlQuery) { + String pattern = "select"; + return sqlQuery.toLowerCase().startsWith(pattern); + } + public Query skip(Integer skip) { this.skipNumber = skip; return this; @@ -50,12 +72,21 @@ public Query selectPolling(String sqlQuery, Timestamp fieldValue) { return this; } - abstract public ResultSet executePolling(Connection connection) throws SQLException; + public Timestamp getMaxPollingValue() { + return maxPollingValue; + } + + public void setMaxPollingValue(Timestamp maxPollingValue) { + this.maxPollingValue = maxPollingValue; + } + + abstract public ArrayList executePolling(Connection connection) throws SQLException; - abstract public ResultSet executeLookup(Connection connection, JsonObject body) + abstract public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException; - abstract public boolean executeRecordExists(Connection connection) throws SQLException; + abstract public boolean executeRecordExists(Connection connection, JsonObject body) + throws SQLException; abstract public int executeDelete(Connection connection, JsonObject body) throws SQLException; @@ -65,29 +96,167 @@ abstract public void executeInsert(Connection connection, String tableName, Json abstract public void executeUpdate(Connection connection, String tableName, String idColumn, String idValue, JsonObject body) throws SQLException; - abstract public ResultSet executeSelectQuery(Connection connection, String sqlQuery, - JsonObject body) throws SQLException; - - abstract public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException; + public ArrayList executeSelectTrigger(Connection connection, String sqlQuery) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sqlQuery)) { + if (pollingValue != null) { + stmt.setTimestamp(1, pollingValue); + } + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + listResult.add(row.build()); + } + return listResult; + } + } + } - public void validateQuery() { - if (tableName == null) { - throw new RuntimeException("Table name is required field"); + public ArrayList executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sqlQuery)) { + int i = 1; + for (Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + try (ResultSet rs = stmt.executeQuery()) { + JsonObjectBuilder row = Json.createObjectBuilder(); + ArrayList listResult = new ArrayList(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + listResult.add(row.build()); + } + return listResult; + } } } - public static String preProcessSelect(String sqlQuery) { - sqlQuery = sqlQuery.trim(); - if (!isSelect(sqlQuery)) { - throw new RuntimeException("Unresolvable SELECT query"); + public JsonObject executeUpsert(Connection connection, String idColumn, + JsonObject body) throws SQLException { + validateQuery(); + JsonObject foundRow; + JsonObjectBuilder row = Json.createObjectBuilder(); + int rowsCount = 0; + int i; + ResultSet rs; + ResultSetMetaData metaData; + + StringBuilder keys = new StringBuilder(); + StringBuilder values = new StringBuilder(); + StringBuilder setString = new StringBuilder(); + for (Map.Entry entry : body.entrySet()) { + if (keys.length() > 0) { + keys.append(","); + } + keys.append(entry.getKey()); + if (values.length() > 0) { + values.append(","); + } + values.append("?"); + if (!entry.getKey().equals(idColumn)) { + if (setString.length() > 0) { + setString.append(","); + } + setString.append(entry.getKey()).append(" = ?"); + } } - return sqlQuery.replaceAll(Utils.VARS_REGEXP, "?"); + + String sqlSELECT = + " SELECT" + + " *" + + " FROM " + tableName + + " WHERE " + idColumn + " = ?"; + String sqlInsert = "INSERT INTO " + tableName + + " (" + keys.toString() + ")" + + " VALUES (" + values.toString() + ")"; + String sqlUpdate = "UPDATE " + tableName + + " SET " + setString.toString() + + " WHERE " + idColumn + " = ?"; + + PreparedStatement stmtSelect = null; + PreparedStatement stmtInsert = null; + PreparedStatement stmtUpdate = null; + + try { + connection.setAutoCommit(false); + + stmtSelect = connection.prepareStatement(sqlSELECT); + Utils.setStatementParam(stmtSelect, 1, idColumn, body); + rs = stmtSelect.executeQuery(); + metaData = rs.getMetaData(); + while (rs.next()) { + for (i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + rowsCount++; + if (rowsCount > 1) { + throw new RuntimeException("Error: the number of matching rows is not exactly one"); + } + } + foundRow = row.build(); + + i = 1; + if (foundRow.size() == 0) { + stmtInsert = connection.prepareStatement(sqlInsert); + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmtInsert, i, entry.getKey(), body); + i++; + } + stmtInsert.execute(); + } else { + stmtUpdate = connection.prepareStatement(sqlUpdate); + for (Map.Entry entry : body.entrySet()) { + if (!entry.getKey().equals(idColumn)) { + Utils.setStatementParam(stmtUpdate, i, entry.getKey(), body); + i++; + } + } + Utils.setStatementParam(stmtUpdate, i, idColumn, body); + stmtUpdate.execute(); + } + + rs = stmtSelect.executeQuery(); + metaData = rs.getMetaData(); + rowsCount = 0; + while (rs.next()) { + for (i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + rowsCount++; + if (rowsCount > 1) { + throw new RuntimeException("Error: the number of matching rows is not exactly one"); + } + } + connection.commit(); + + } finally { + if (stmtSelect != null) { + stmtSelect.close(); + } + if (stmtInsert != null) { + stmtInsert.close(); + } + if (stmtUpdate != null) { + stmtUpdate.close(); + } + connection.setAutoCommit(true); + } + return row.build(); } - public static boolean isSelect(String sqlQuery) { - String pattern = "select"; - return sqlQuery.toLowerCase().startsWith(pattern); + public void validateQuery() { + if (tableName == null) { + throw new RuntimeException("Table name is required field"); + } } } diff --git a/src/main/java/io/elastic/jdbc/QueryColumnNamesProvider.java b/src/main/java/io/elastic/jdbc/QueryColumnNamesProvider.java index 45c586f..503feb1 100644 --- a/src/main/java/io/elastic/jdbc/QueryColumnNamesProvider.java +++ b/src/main/java/io/elastic/jdbc/QueryColumnNamesProvider.java @@ -14,7 +14,7 @@ public class QueryColumnNamesProvider implements DynamicMetadataProvider, SelectModelProvider { - private static final Logger logger = LoggerFactory.getLogger(QueryColumnNamesProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(QueryColumnNamesProvider.class); public JsonObject getSelectModel(JsonObject configuration) { JsonObject result = Json.createObjectBuilder().build(); @@ -49,13 +49,13 @@ public JsonObject getColumns(JsonObject configuration) { Boolean isEmpty = true; if (matcher.find()) { do { - logger.info("Var = {}", matcher.group()); + LOGGER.info("Var = {}", matcher.group()); JsonObjectBuilder field = Json.createObjectBuilder(); String result[] = matcher.group().split(":"); String name = result[0].substring(1); String type = result[1]; field.add("title", name) - .add("type", type); + .add("type", type); properties.add(name, field); isEmpty = false; } while (matcher.find()); diff --git a/src/main/java/io/elastic/jdbc/TableNameProvider.java b/src/main/java/io/elastic/jdbc/TableNameProvider.java index b8ed7cf..d1c6a8f 100755 --- a/src/main/java/io/elastic/jdbc/TableNameProvider.java +++ b/src/main/java/io/elastic/jdbc/TableNameProvider.java @@ -15,11 +15,11 @@ public class TableNameProvider implements SelectModelProvider { - private static final Logger logger = LoggerFactory.getLogger(TableNameProvider.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TableNameProvider.class); @Override public JsonObject getSelectModel(JsonObject configuration) { - logger.info("About to retrieve table name"); + LOGGER.info("About to retrieve table name"); JsonObjectBuilder result = Json.createObjectBuilder(); Connection connection = null; @@ -27,7 +27,7 @@ public JsonObject getSelectModel(JsonObject configuration) { try { connection = Utils.getConnection(configuration); - logger.info("Successfully connected to DB"); + LOGGER.info("Successfully connected to DB"); // get metadata DatabaseMetaData md = connection.getMetaData(); @@ -58,21 +58,21 @@ && isOracleServiceSchema(schemaName)) { result.add("empty dataset", "no tables"); } } catch (SQLException e) { - logger.error("Unexpected error {}", e); + LOGGER.error("Unexpected error {}", e); throw new RuntimeException(e); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { - logger.error(e.toString()); + LOGGER.error(e.toString()); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { - logger.error(e.toString()); + LOGGER.error(e.toString()); } } } diff --git a/src/main/java/io/elastic/jdbc/Utils.java b/src/main/java/io/elastic/jdbc/Utils.java index 57e7db1..99fddc9 100644 --- a/src/main/java/io/elastic/jdbc/Utils.java +++ b/src/main/java/io/elastic/jdbc/Utils.java @@ -1,6 +1,5 @@ package io.elastic.jdbc; -import java.math.BigDecimal; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.Date; @@ -27,16 +26,15 @@ public class Utils { - private static final Logger logger = LoggerFactory.getLogger(Utils.class); - public static final String CFG_DATABASE_NAME = "databaseName"; public static final String CFG_PASSWORD = "password"; public static final String CFG_PORT = "port"; public static final String CFG_DB_ENGINE = "dbEngine"; public static final String CFG_HOST = "host"; public static final String CFG_USER = "user"; - public static Map columnTypes = null; public static final String VARS_REGEXP = "@([\\w_$][\\d\\w_$]*(:(string|boolean|date|number|bigint|float|real))?)"; + private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); + public static Map columnTypes = null; public static Connection getConnection(final JsonObject config) { final String engine = getRequiredNonEmptyString(config, CFG_DB_ENGINE, "Engine is required") @@ -50,7 +48,7 @@ public static Connection getConnection(final JsonObject config) { "Database name is required"); engineType.loadDriverClass(); final String connectionString = engineType.getConnectionString(host, port, databaseName); - logger.info("Connecting to {}", connectionString); + LOGGER.info("Connecting to {}", connectionString); try { return DriverManager.getConnection(connectionString, user, password); } catch (Exception e) { @@ -86,7 +84,7 @@ public static String getNonNullString(final JsonObject config, final String key) } } } catch (NullPointerException | ClassCastException e) { - logger.info("key {} doesn't have any mapping: {}", key, e); + LOGGER.info("key {} doesn't have any mapping: {}", key, e); } return value.toString().replaceAll("\"", ""); } @@ -100,48 +98,48 @@ private static Integer getPort(final JsonObject config, final Engines engineType } public static void setStatementParam(PreparedStatement statement, int paramNumber, String colName, - String colValue) throws SQLException { + JsonObject body) throws SQLException { try { if (isNumeric(colName)) { - if (colValue != "null") { - statement.setBigDecimal(paramNumber, new BigDecimal(colValue)); + if (body.get(colName) != null) { + statement.setBigDecimal(paramNumber, body.getJsonNumber(colName).bigDecimalValue()); } else { statement.setBigDecimal(paramNumber, null); } } else if (isTimestamp(colName)) { - if (colValue != "null") { - statement.setTimestamp(paramNumber, Timestamp.valueOf(colValue)); + if (body.get(colName) != null) { + statement.setTimestamp(paramNumber, Timestamp.valueOf(body.getString(colName))); } else { statement.setTimestamp(paramNumber, null); } } else if (isDate(colName)) { - if (colValue != "null") { - statement.setDate(paramNumber, Date.valueOf(colValue)); + if (body.get(colName) != null) { + statement.setDate(paramNumber, Date.valueOf(body.getString(colName))); } else { statement.setDate(paramNumber, null); } } else if (isBoolean(colName)) { - if (colValue != "null") { - statement.setBoolean(paramNumber, Boolean.valueOf(colValue)); + if (body.get(colName) != null) { + statement.setBoolean(paramNumber, body.getBoolean(colName)); } else { statement.setBoolean(paramNumber, false); } } else { - if (colValue != "null") { - statement.setString(paramNumber, colValue); + if (body.get(colName) != null) { + statement.setString(paramNumber, body.getString(colName)); } else { statement.setNull(paramNumber, Types.VARCHAR); } } } catch (java.lang.NumberFormatException e) { String message = String - .format("Provided data: %s can't be cast to the column %s datatype", colValue, + .format("Provided data: %s can't be cast to the column %s datatype", body.get(colName), colName); throw new RuntimeException(message); } } - private static String detectColumnType(Integer sqlType) { + private static String detectColumnType(Integer sqlType, String sqlTypeName) { if (sqlType == Types.NUMERIC || sqlType == Types.DECIMAL || sqlType == Types.TINYINT || sqlType == Types.SMALLINT || sqlType == Types.INTEGER || sqlType == Types.BIGINT || sqlType == Types.REAL || sqlType == Types.FLOAT || sqlType == Types.DOUBLE) { @@ -156,6 +154,11 @@ private static String detectColumnType(Integer sqlType) { if (sqlType == Types.BIT || sqlType == Types.BOOLEAN) { return "boolean"; } + if (sqlType == Types.OTHER) { + if (sqlTypeName.toLowerCase().contains("timestamp")) { + return "timestamp"; + } + } return "string"; } @@ -198,7 +201,7 @@ public static Map getColumnTypes(Connection connection, Boolean rs = md.getColumns(null, schemaName, tableName, "%"); while (rs.next()) { String name = rs.getString("COLUMN_NAME").toLowerCase(); - String type = detectColumnType(rs.getInt("DATA_TYPE")); + String type = detectColumnType(rs.getInt("DATA_TYPE"), rs.getString("TYPE_NAME")); columnTypes.put(name, type); } } catch (Exception e) { @@ -208,7 +211,7 @@ public static Map getColumnTypes(Connection connection, Boolean try { rs.close(); } catch (Exception e) { - logger.error(e.toString()); + LOGGER.error(e.toString()); } } } @@ -216,25 +219,19 @@ public static Map getColumnTypes(Connection connection, Boolean } public static Map getVariableTypes(String sqlQuery) { - JsonObject properties = Json.createObjectBuilder().build(); Map columnTypes = new HashMap(); Pattern pattern = Pattern.compile(Utils.VARS_REGEXP); Matcher matcher = pattern.matcher(sqlQuery); - Boolean isEmpty = true; + Boolean isEmpty; if (matcher.find()) { do { - JsonObject field = Json.createObjectBuilder().build(); String result[] = matcher.group().split(":"); String name = result[0].substring(1); String type = result[1]; - field = Json.createObjectBuilder().add("title", name) - .add("type", type).build(); - properties = Json.createObjectBuilder().add(name, field).build(); columnTypes.put(name, type); isEmpty = false; } while (matcher.find()); if (isEmpty) { - properties = Json.createObjectBuilder().add("empty dataset", "no columns").build(); columnTypes.put("empty dataset", "no columns"); } } @@ -244,64 +241,102 @@ public static Map getVariableTypes(String sqlQuery) { public static JsonObjectBuilder getColumnDataByType(ResultSet rs, ResultSetMetaData metaData, int i, JsonObjectBuilder row) { try { + final String columnName = metaData.getColumnName(i); + if (null == rs.getObject(columnName)) { + row.add(columnName, JsonValue.NULL); + return row; + } switch (metaData.getColumnType(i)) { case Types.BOOLEAN: case Types.BIT: - row.add(metaData.getColumnName(i), rs.getBoolean(metaData.getColumnName(i))); + row.add(columnName, rs.getBoolean(columnName)); break; case Types.BINARY: case Types.VARBINARY: case Types.LONGVARBINARY: - String floatString = Arrays.toString(rs.getBytes(metaData.getColumnName(i))); - row.add(metaData.getColumnName(i), floatString); + String floatString = Arrays.toString(rs.getBytes(columnName)); + row.add(columnName, floatString); break; case Types.INTEGER: - row.add(metaData.getColumnName(i), rs.getInt(metaData.getColumnName(i))); + row.add(columnName, rs.getInt(columnName)); break; case Types.NUMERIC: case Types.DECIMAL: - row.add(metaData.getColumnName(i), rs.getBigDecimal(metaData.getColumnName(i))); + row.add(columnName, (rs.getBigDecimal(columnName) != null)); break; case Types.DOUBLE: - row.add(metaData.getColumnName(i), rs.getDouble(metaData.getColumnName(i))); + row.add(columnName, rs.getDouble(columnName)); break; case Types.FLOAT: case Types.REAL: - row.add(metaData.getColumnName(i), rs.getFloat(metaData.getColumnName(i))); + row.add(columnName, rs.getFloat(columnName)); break; case Types.SMALLINT: - row.add(metaData.getColumnName(i), rs.getShort(metaData.getColumnName(i))); + row.add(columnName, rs.getShort(columnName)); break; case Types.TINYINT: - row.add(metaData.getColumnName(i), rs.getByte(metaData.getColumnName(i))); + row.add(columnName, rs.getByte(columnName)); break; case Types.BIGINT: - row.add(metaData.getColumnName(i), rs.getLong(metaData.getColumnName(i))); + row.add(columnName, rs.getLong(columnName)); break; case Types.TIMESTAMP: - row.add(metaData.getColumnName(i), rs.getTimestamp(metaData.getColumnName(i)).toString()); + row.add(columnName, rs.getTimestamp(columnName).toString()); break; case Types.DATE: - row.add(metaData.getColumnName(i), (rs.getDate(metaData.getColumnName(i)) != null) ? rs - .getDate(metaData.getColumnName(i)).toString() : ""); + row.add(columnName, rs.getDate(columnName).toString()); break; case Types.TIME: - row.add(metaData.getColumnName(i), rs.getTime(metaData.getColumnName(i)).toString()); + row.add(columnName, rs.getTime(columnName).toString()); break; default: - String columnName = rs.getString(metaData.getColumnName(i)); - if (columnName != null) { - row.add(metaData.getColumnName(i), columnName); - } else { - row.add(metaData.getColumnName(i), ""); - } + row.add(columnName, rs.getString(columnName)); break; } } catch (SQLException | java.lang.NullPointerException e) { - logger.error("Failed to get data by type", e.toString()); + LOGGER.error("Failed to get data by type", e); throw new RuntimeException(e); } return row; } + public static JsonObject getLookupRow(Connection connection, JsonObject body, String sql, + Integer secondParameter, Integer thirdParameter) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + JsonObjectBuilder row = Json.createObjectBuilder(); + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, 1, entry.getKey(), body); + } + stmt.setInt(2, secondParameter); + stmt.setInt(3, thirdParameter); + try (ResultSet rs = stmt.executeQuery()) { + ResultSetMetaData metaData = rs.getMetaData(); + int rowsCount = 0; + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + rowsCount++; + if (rowsCount > 1) { + LOGGER.error("Error: the number of matching rows is not exactly one"); + throw new RuntimeException("Error: the number of matching rows is not exactly one"); + } + } + return row.build(); + } + } + } + + public static boolean isRecordExists(Connection connection, JsonObject body, String sql, + String lookupField) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + Utils.setStatementParam(stmt, 1, lookupField, body); + try (ResultSet rs = stmt.executeQuery()) { + rs.next(); + return rs.getInt(1) > 0; + } + } + } + } diff --git a/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java b/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java index 8fa7a38..e29b0b0 100644 --- a/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java +++ b/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java @@ -8,12 +8,10 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import javax.json.JsonValue; import org.slf4j.Logger; @@ -21,31 +19,24 @@ public class DeleteRowByPrimaryKey implements Module { - private static final Logger logger = LoggerFactory.getLogger(LookupRowByPrimaryKey.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LookupRowByPrimaryKey.class); private static final String PROPERTY_DB_ENGINE = "dbEngine"; private static final String PROPERTY_TABLE_NAME = "tableName"; private static final String PROPERTY_ID_COLUMN = "idColumn"; private static final String PROPERTY_LOOKUP_VALUE = "lookupValue"; private static final String PROPERTY_NULLABLE_RESULT = "nullableResult"; - private Connection connection = null; - private Map columnTypes = null; - public boolean isOracle = false; - @Override public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSet rs = null; StringBuilder primaryKey = new StringBuilder(); StringBuilder primaryValue = new StringBuilder(); Integer primaryKeysCount = 0; String tableName = ""; String dbEngine = ""; Boolean nullableResult = false; - Integer rowsCount = 0; if (configuration.containsKey(PROPERTY_TABLE_NAME) && Utils.getNonNullString(configuration, PROPERTY_TABLE_NAME).length() != 0) { @@ -71,86 +62,66 @@ public void execute(ExecutionParameters parameters) { nullableResult = true; } - boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); - for (Map.Entry entry : body.entrySet()) { - logger.info("{} = {}", entry.getKey(), entry.getValue()); + LOGGER.info("{} = {}", entry.getKey(), entry.getValue()); primaryKey.append(entry.getKey()); primaryValue.append(entry.getValue()); primaryKeysCount++; } if (primaryKeysCount == 1) { - logger.info("Executing delete row by primary key action"); - Connection connection = Utils.getConnection(configuration); - Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); - logger.info("Detected column types: " + Utils.columnTypes); - try { - QueryFactory queryFactory = new QueryFactory(); - Query query = queryFactory.getQuery(dbEngine); - logger.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); - query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); - checkConfig(configuration); - rs = query.executeLookup(connection, body); - while (rs.next()) { - rowsCount++; - if (rowsCount > 1) { - logger.error("Error: the number of matching rows is not exactly one"); - throw new RuntimeException("Error: the number of matching rows is not exactly one"); + try (Connection connection = Utils.getConnection(configuration)) { + LOGGER.info("Executing delete row by primary key action"); + boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); + Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); + LOGGER.info("Detected column types: " + Utils.columnTypes); + try { + QueryFactory queryFactory = new QueryFactory(); + Query query = queryFactory.getQuery(dbEngine); + LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); + query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); + checkConfig(configuration); + JsonObject row = query.executeLookup(connection, body); + + for (Map.Entry entry : configuration.entrySet()) { + LOGGER.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); } - } - - for (Map.Entry entry : configuration.entrySet()) { - logger.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); - } - - if (rowsCount == 1) { - int result = query.executeDelete(connection, body); - if (result == 1) { - row.add("result", result); - logger.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else { - logger.info("Unexpected result"); - throw new RuntimeException("Unexpected result"); + if (row.size() != 0) { + int result = query.executeDelete(connection, body); + if (result == 1) { + LOGGER.info("Emitting data {}", row); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); + } else { + LOGGER.info("Unexpected result"); + throw new RuntimeException("Unexpected result"); + } + } else if (row.size() == 0 && nullableResult) { + JsonObject emptyRow = Json.createObjectBuilder() + .add("empty dataset", "nothing to delete") + .build(); + LOGGER.info("Emitting data {}", emptyRow); + parameters.getEventEmitter().emitData(new Message.Builder().body(emptyRow).build()); + } else if (row.size() == 0 && !nullableResult) { + LOGGER.info("Empty response. Error message will be returned"); + throw new RuntimeException("Empty response"); } - } else if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "nothing to delete"); - logger.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { - logger.info("Empty response. Error message will be returned"); - throw new RuntimeException("Empty response"); - } - snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_ID_COLUMN, primaryKey.toString()) - .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) - .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); - logger.info("Emitting new snapshot {}", snapshot.toString()); - parameters.getEventEmitter().emitSnapshot(snapshot); - } catch (SQLException e) { - logger.error("Failed to make request", e.toString()); - throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - logger.error("Failed to close result set", e.toString()); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error("Failed to close connection", e.toString()); - } + snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_ID_COLUMN, primaryKey.toString()) + .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) + .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } catch (SQLException e) { + LOGGER.error("Failed to make request", e.toString()); + throw new RuntimeException(e); } + } catch (SQLException e) { + LOGGER.error("Failed to close connection", e.toString()); } } else { - logger.error("Error: Should be one Primary Key"); + LOGGER.error("Error: Should be one Primary Key"); throw new IllegalStateException("Should be one Primary Key"); } } diff --git a/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java b/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java index 5b3f9b6..de40b44 100644 --- a/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java +++ b/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java @@ -8,13 +8,10 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Map; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import javax.json.JsonValue; import org.slf4j.Logger; @@ -22,7 +19,7 @@ public class LookupRowByPrimaryKey implements Module { - private static final Logger logger = LoggerFactory.getLogger(LookupRowByPrimaryKey.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LookupRowByPrimaryKey.class); private static final String PROPERTY_DB_ENGINE = "dbEngine"; private static final String PROPERTY_TABLE_NAME = "tableName"; private static final String PROPERTY_ID_COLUMN = "idColumn"; @@ -34,15 +31,12 @@ public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSet rs = null; StringBuilder primaryKey = new StringBuilder(); StringBuilder primaryValue = new StringBuilder(); Integer primaryKeysCount = 0; String tableName = ""; String dbEngine = ""; Boolean nullableResult = false; - Integer rowsCount = 0; if (configuration.containsKey(PROPERTY_TABLE_NAME) && Utils.getNonNullString(configuration, PROPERTY_TABLE_NAME).length() != 0) { @@ -71,81 +65,56 @@ public void execute(ExecutionParameters parameters) { boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); for (Map.Entry entry : body.entrySet()) { - logger.info("{} = {}", entry.getKey(), entry.getValue()); + LOGGER.info("{} = {}", entry.getKey(), entry.getValue()); primaryKey.append(entry.getKey()); primaryValue.append(entry.getValue()); primaryKeysCount++; } if (primaryKeysCount == 1) { - logger.info("Executing lookup row by primary key action"); - Connection connection = Utils.getConnection(configuration); - Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); - logger.info("Detected column types: " + Utils.columnTypes); - try { - QueryFactory queryFactory = new QueryFactory(); - Query query = queryFactory.getQuery(dbEngine); - logger.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); - query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); - checkConfig(configuration); - rs = query.executeLookup(connection, body); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - if (rowsCount > 1) { - logger.error("Error: the number of matching rows is not exactly one"); - throw new RuntimeException("Error: the number of matching rows is not exactly one"); - } else { - logger.info("Emitting data"); - logger.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } - } - - for (Map.Entry entry : configuration.entrySet()) { - logger.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); - } - if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "no data"); - logger.info("Emitting data"); - logger.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { - logger.info("Empty response. Error message will be returned"); - throw new RuntimeException("Empty response"); - } + try (Connection connection = Utils.getConnection(configuration)) { + LOGGER.info("Executing lookup row by primary key action"); + Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); + LOGGER.info("Detected column types: " + Utils.columnTypes); + try { + QueryFactory queryFactory = new QueryFactory(); + Query query = queryFactory.getQuery(dbEngine); + LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); + query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); + checkConfig(configuration); - snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_ID_COLUMN, primaryKey.toString()) - .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) - .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); - logger.info("Emitting new snapshot {}", snapshot.toString()); - parameters.getEventEmitter().emitSnapshot(snapshot); - } catch (SQLException e) { - logger.error("Failed to make request", e.toString()); - throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - logger.error("Failed to close result set", e.toString()); + JsonObject row = query.executeLookup(connection, body); + if (row.size() != 0) { + LOGGER.info("Emitting data"); + LOGGER.info(row.toString()); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error("Failed to close connection", e.toString()); + if (row.size() == 0 && nullableResult) { + row.put("empty dataset", null); + LOGGER.info("Emitting data"); + LOGGER.info(row.toString()); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); + } else if (row.size() == 0 && !nullableResult) { + LOGGER.info("Empty response. Error message will be returned"); + throw new RuntimeException("Empty response"); } + + snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_ID_COLUMN, primaryKey.toString()) + .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) + .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } catch (SQLException e) { + LOGGER.error("Failed to make request", e.toString()); + throw new RuntimeException(e); } + } catch (SQLException e) { + LOGGER.error("Failed to close connection", e.toString()); } } else { - logger.error("Error: Should be one Primary Key"); + LOGGER.error("Error: Should be one Primary Key"); throw new IllegalStateException("Should be one Primary Key"); } } diff --git a/src/main/java/io/elastic/jdbc/actions/SelectAction.java b/src/main/java/io/elastic/jdbc/actions/SelectAction.java index 83860d4..ca521bc 100644 --- a/src/main/java/io/elastic/jdbc/actions/SelectAction.java +++ b/src/main/java/io/elastic/jdbc/actions/SelectAction.java @@ -7,19 +7,17 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SelectAction implements Module { - private static final Logger logger = LoggerFactory.getLogger(SelectAction.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SelectAction.class); private static final String SQL_QUERY_VALUE = "sqlQuery"; private static final String PROPERTY_NULLABLE_RESULT = "nullableResult"; private static final String PROPERTY_SKIP_NUMBER = "skipNumber"; @@ -29,14 +27,12 @@ public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); Connection connection = Utils.getConnection(configuration); String dbEngine = configuration.getString("dbEngine"); String sqlQuery = configuration.getString("sqlQuery"); Integer skipNumber = 0; Boolean nullableResult = false; - Integer rowsCount = 0; if (Utils.getNonNullString(configuration, PROPERTY_NULLABLE_RESULT).equals("true")) { nullableResult = true; @@ -49,40 +45,39 @@ public void execute(ExecutionParameters parameters) { } Utils.columnTypes = Utils.getVariableTypes(sqlQuery); - logger.info("Detected column types: " + Utils.columnTypes); - ResultSet rs = null; - logger.info("Executing select trigger"); + LOGGER.info("Detected column types: " + Utils.columnTypes); + LOGGER.info("Executing select action"); try { QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); sqlQuery = Query.preProcessSelect(sqlQuery); - logger.info("SQL Query: {}", sqlQuery); - rs = query.executeSelectQuery(connection, sqlQuery, body); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - logger.info("columns count: {} from {}", rowsCount, metaData.getColumnCount()); - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - logger.info("Emitting data"); - logger.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + LOGGER.info("SQL Query: {}", sqlQuery); + + ArrayList resultList = query.executeSelectQuery(connection, sqlQuery, body); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "no data"); - logger.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { - logger.info("Empty response. Error message will be returned"); + if (resultList.size() == 0 && nullableResult) { + resultList.add(Json.createObjectBuilder() + .add("empty dataset", "no data") + .build()); + LOGGER.info("Emitting data {}", resultList.get(0)); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(0)).build()); + } else if (resultList.size() == 0 && !nullableResult) { + LOGGER.info("Empty response. Error message will be returned"); throw new RuntimeException("Empty response"); } - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) .add(SQL_QUERY_VALUE, sqlQuery) .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); - logger.info("Emitting new snapshot {}", snapshot.toString()); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); parameters.getEventEmitter().emitSnapshot(snapshot); } catch (SQLException e) { throw new RuntimeException(e); @@ -91,7 +86,7 @@ public void execute(ExecutionParameters parameters) { try { connection.close(); } catch (SQLException e) { - logger.error(e.toString()); + LOGGER.error(e.toString()); } } } diff --git a/src/main/java/io/elastic/jdbc/actions/UpsertRowByPrimaryKey.java b/src/main/java/io/elastic/jdbc/actions/UpsertRowByPrimaryKey.java index 16cd442..a5de42e 100644 --- a/src/main/java/io/elastic/jdbc/actions/UpsertRowByPrimaryKey.java +++ b/src/main/java/io/elastic/jdbc/actions/UpsertRowByPrimaryKey.java @@ -3,78 +3,98 @@ import io.elastic.api.ExecutionParameters; import io.elastic.api.Message; import io.elastic.api.Module; +import io.elastic.jdbc.Engines; import io.elastic.jdbc.QueryBuilders.Query; import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; -import java.text.SimpleDateFormat; -import java.util.Calendar; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; import javax.json.Json; -import javax.json.JsonArray; import javax.json.JsonObject; -import javax.json.JsonString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.*; -import java.util.Map; - public class UpsertRowByPrimaryKey implements Module { - private static final Logger logger = LoggerFactory.getLogger(UpsertRowByPrimaryKey.class); + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertRowByPrimaryKey.class); private static final String PROPERTY_DB_ENGINE = "dbEngine"; private static final String PROPERTY_TABLE_NAME = "tableName"; - private static final String PROPERTY_ID_COLUMN = "idColumn"; - - private Connection connection = null; - - /* TODO: UpsertRowByPrimaryKey action migrate to sailor v2 */ @Override public void execute(ExecutionParameters parameters) { final JsonObject configuration = parameters.getConfiguration(); final JsonObject body = parameters.getMessage().getBody(); - if (!configuration.has(PROPERTY_TABLE_NAME) || configuration.get(PROPERTY_TABLE_NAME) - .isJsonNull() || configuration.get(PROPERTY_TABLE_NAME).getAsString().isEmpty()) { + JsonObject snapshot = parameters.getSnapshot(); + JsonObject resultRow; + String tableName; + String dbEngine; + String schemaName = ""; + String primaryKey = ""; + int primaryKeysCount = 0; + + if (configuration.containsKey(PROPERTY_TABLE_NAME) + && Utils.getNonNullString(configuration, PROPERTY_TABLE_NAME).length() != 0) { + tableName = configuration.getString(PROPERTY_TABLE_NAME); + } else if (snapshot.containsKey(PROPERTY_TABLE_NAME) + && Utils.getNonNullString(snapshot, PROPERTY_TABLE_NAME).length() != 0) { + tableName = snapshot.getString(PROPERTY_TABLE_NAME); + } else { throw new RuntimeException("Table name is required field"); } - if (!configuration.has(PROPERTY_ID_COLUMN) || configuration.get(PROPERTY_ID_COLUMN).isJsonNull() - || configuration.get("idColumn").getAsString().isEmpty()) { - throw new RuntimeException("ID column is required field"); - } - String tableName = configuration.get(PROPERTY_TABLE_NAME).getAsString(); - String idColumn = configuration.get(PROPERTY_ID_COLUMN).getAsString(); - String idColumnValue = null; - if (!(!body.has(idColumn) || body.get(idColumn).isJsonNull() || body.get(idColumn).getAsString() - .isEmpty())) { - idColumnValue = body.get(idColumn).getAsString(); + + if (Utils.getNonNullString(configuration, PROPERTY_DB_ENGINE).length() != 0) { + dbEngine = configuration.getString(PROPERTY_DB_ENGINE); + } else if (Utils.getNonNullString(snapshot, PROPERTY_DB_ENGINE).length() != 0) { + dbEngine = snapshot.getString(PROPERTY_DB_ENGINE); + } else { + throw new RuntimeException("DB Engine is required field"); } - logger.info("ID column value: {}", idColumnValue); - String db = configuration.get(PROPERTY_DB_ENGINE).getAsString(); - boolean isOracle = db.equals(Engines.ORACLE.name().toLowerCase()); - try { - connection = Utils.getConnection(configuration); - Map columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); - logger.info("Detected column types: " + columnTypes); - QueryFactory queryFactory = new QueryFactory(); - Query query = queryFactory.getQuery(PROPERTY_DB_ENGINE); - query.from(tableName).lookup(idColumn, idColumnValue); - if (query.executeRecordExists(connection)) { - query.executeUpdate(connection, tableName, idColumn, idColumnValue, body); - } else { - query.executeInsert(connection, tableName, body); + + LOGGER.info("Executing lookup primary key"); + boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); + + try (Connection connection = Utils.getConnection(configuration)) { + DatabaseMetaData dbMetaData = connection.getMetaData(); + if (tableName.contains(".")) { + schemaName = + (isOracle) ? tableName.split("\\.")[0].toUpperCase() : tableName.split("\\.")[0]; + tableName = + (isOracle) ? tableName.split("\\.")[1].toUpperCase() : tableName.split("\\.")[1]; } - this.getEventEmitter().emitData(new Message.Builder().body(body).build()); - } catch (SQLException e) { - throw new RuntimeException(e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error(e.toString()); + try (ResultSet rs = dbMetaData + .getPrimaryKeys(null, ((isOracle && !schemaName.isEmpty()) ? schemaName : null), + tableName)) { + while (rs.next()) { + primaryKey = rs.getString("COLUMN_NAME"); + primaryKeysCount++; + } + if (primaryKeysCount == 1) { + LOGGER.info("Executing upsert row by primary key action"); + Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); + LOGGER.info("Detected column types: " + Utils.columnTypes); + QueryFactory queryFactory = new QueryFactory(); + Query query = queryFactory.getQuery(dbEngine); + LOGGER + .info("Execute upsert parameters by PK: '{}' = {}", primaryKey, body.get(primaryKey)); + query.from(tableName); + resultRow = query.executeUpsert(connection, primaryKey, body); + LOGGER.info("Emit data= {}", resultRow); + parameters.getEventEmitter().emitData(new Message.Builder().body(resultRow).build()); + snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } else if (primaryKeysCount == 0) { + LOGGER.error("Error: Table has not Primary Key. Should be one Primary Key"); + throw new IllegalStateException("Table has not Primary Key. Should be one Primary Key"); + } else { + LOGGER.error("Error: Composite Primary Key is not supported"); + throw new IllegalStateException("Composite Primary Key is not supported"); } } + } catch (SQLException e) { + throw new RuntimeException(e); } } - } diff --git a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java index e90e44f..63f02a7 100644 --- a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java +++ b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java @@ -7,16 +7,13 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import javax.json.Json; -import javax.json.JsonArray; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,118 +21,95 @@ public class GetRowsPollingTrigger implements Module { - private static final Logger logger = LoggerFactory.getLogger(GetRowsPollingTrigger.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GetRowsPollingTrigger.class); private static final String PROPERTY_TABLE_NAME = "tableName"; private static final String PROPERTY_POLLING_FIELD = "pollingField"; private static final String PROPERTY_POLLING_VALUE = "pollingValue"; private static final String PROPERTY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.sss"; private static final String PROPERTY_SKIP_NUMBER = "skipNumber"; private static final String DATETIME_REGEX = "(\\d{4})-(\\d{2})-(\\d{2}) (\\d{2}):(\\d{2}):(\\d{2})(\\.(\\d{1,3}))?"; - private static boolean isEmpty = true; - + @Override public final void execute(ExecutionParameters parameters) { - logger.info("About to execute select trigger"); + LOGGER.info("About to execute select trigger"); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); - Connection connection = Utils.getConnection(configuration); + Connection connection = null; Integer skipNumber = 0; - Integer rowsCount = 0; String pollingField = ""; Calendar cDate = Calendar.getInstance(); cDate.set(cDate.get(Calendar.YEAR), cDate.get(Calendar.MONTH), cDate.get(Calendar.DATE), 0, 0, 0); - String dbEngine = configuration.getString(Utils.CFG_DB_ENGINE); String tableName = configuration.getString(PROPERTY_TABLE_NAME); - if(Utils.getNonNullString(configuration, PROPERTY_POLLING_FIELD).length() != 0) { + + if (Utils.getNonNullString(configuration, PROPERTY_POLLING_FIELD).length() != 0) { pollingField = configuration.getString(PROPERTY_POLLING_FIELD); } Timestamp pollingValue; Timestamp cts = new java.sql.Timestamp(cDate.getTimeInMillis()); - Timestamp maxPollingValue = cts; String formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT).format(cts); - if (configuration.containsKey(PROPERTY_POLLING_VALUE) && Utils.getNonNullString(configuration, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { + if (configuration.containsKey(PROPERTY_POLLING_VALUE) && Utils + .getNonNullString(configuration, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { pollingValue = Timestamp.valueOf(configuration.getString(PROPERTY_POLLING_VALUE)); - } else if (snapshot.containsKey(PROPERTY_POLLING_VALUE) && Utils.getNonNullString(snapshot, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { + } else if (snapshot.containsKey(PROPERTY_POLLING_VALUE) && Utils + .getNonNullString(snapshot, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { pollingValue = Timestamp.valueOf(snapshot.getString(PROPERTY_POLLING_VALUE)); } else { - logger.info( + LOGGER.info( "There is an empty value for Start Polling From at the config and snapshot. So, we set Current Date = " + formattedDate); pollingValue = cts; } - if (snapshot.containsKey(PROPERTY_SKIP_NUMBER)) + if (snapshot.containsKey(PROPERTY_SKIP_NUMBER)) { skipNumber = snapshot.getInt(PROPERTY_SKIP_NUMBER); + } - if (snapshot.containsKey(PROPERTY_TABLE_NAME) && snapshot.get(PROPERTY_TABLE_NAME) != null && !snapshot.getString(PROPERTY_TABLE_NAME) + if (snapshot.containsKey(PROPERTY_TABLE_NAME) && snapshot.get(PROPERTY_TABLE_NAME) != null + && !snapshot.getString(PROPERTY_TABLE_NAME) .equals(tableName)) { skipNumber = 0; } - ResultSet rs = null; - logger.info("Executing row polling trigger"); + LOGGER.info("Executing row polling trigger"); try { + connection = Utils.getConnection(configuration); QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); query.from(tableName).skip(skipNumber).orderBy(pollingField) .rowsPolling(pollingField, pollingValue); - rs = query.executePolling(connection); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - rowsCount++; - } + query.setMaxPollingValue(cts); + ArrayList resultList = query.executePolling(connection); - if (rowsCount == 0) { - row.add("empty dataset", "no data"); - logger.info("Emitting empty data"); - maxPollingValue = new java.sql.Timestamp(System.currentTimeMillis()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT).format(maxPollingValue); + formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT) + .format(query.getMaxPollingValue()); - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) - .add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_POLLING_FIELD, pollingField) - .add(PROPERTY_POLLING_VALUE, formattedDate).build(); - logger.info("Emitting new snapshot {}", snapshot.toString()); + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) + .add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_POLLING_FIELD, pollingField) + .add(PROPERTY_POLLING_VALUE, formattedDate).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); parameters.getEventEmitter().emitSnapshot(snapshot); } catch (SQLException e) { - logger.error("Failed to make request", e.toString()); + LOGGER.error("Failed to make request", e.toString()); throw new RuntimeException(e); } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - logger.error("Failed to close result set", e.toString()); - } - } if (connection != null) { try { connection.close(); } catch (SQLException e) { - logger.error("Failed to close connection", e.toString()); + LOGGER.error("Failed to close connection", e.toString()); } } } diff --git a/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java b/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java index a2b4999..27a0118 100644 --- a/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java +++ b/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java @@ -6,20 +6,22 @@ import io.elastic.jdbc.QueryBuilders.Query; import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.*; - public class SelectTrigger implements Module { - private static final Logger logger = LoggerFactory.getLogger(SelectTrigger.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SelectTrigger.class); private static final String PROPERTY_DB_ENGINE = "dbEngine"; private static final String LAST_POLL_PLACEHOLDER = "%%EIO_LAST_POLL%%"; private static final String SQL_QUERY_VALUE = "sqlQuery"; @@ -30,14 +32,12 @@ public class SelectTrigger implements Module { @Override public final void execute(ExecutionParameters parameters) { - logger.info("About to execute select trigger"); + LOGGER.info("About to execute select trigger"); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); Connection connection = Utils.getConnection(configuration); Integer skipNumber = 0; - Integer rowsCount = 0; Calendar cDate = Calendar.getInstance(); cDate.set(cDate.get(Calendar.YEAR), cDate.get(Calendar.MONTH), cDate.get(Calendar.DATE), 0, 0, @@ -47,74 +47,52 @@ public final void execute(ExecutionParameters parameters) { Timestamp cts = new java.sql.Timestamp(cDate.getTimeInMillis()); String formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT).format(cts); - if (configuration.containsKey(PROPERTY_POLLING_VALUE) && Utils.getNonNullString(configuration, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { + if (configuration.containsKey(PROPERTY_POLLING_VALUE) && Utils + .getNonNullString(configuration, PROPERTY_POLLING_VALUE).matches(DATETIME_REGEX)) { pollingValue = Timestamp.valueOf(configuration.getString(PROPERTY_POLLING_VALUE)); - } else if (snapshot.containsKey(PROPERTY_POLLING_VALUE) && Utils.getNonNullString(snapshot, LAST_POLL_PLACEHOLDER).matches(DATETIME_REGEX)) { + } else if (snapshot.containsKey(PROPERTY_POLLING_VALUE) && Utils + .getNonNullString(snapshot, LAST_POLL_PLACEHOLDER).matches(DATETIME_REGEX)) { pollingValue = Timestamp.valueOf(snapshot.getString(LAST_POLL_PLACEHOLDER)); } else { - logger.info( + LOGGER.info( "There is an empty value for Start Polling From at the config and snapshot. So, we set Current Date = " + formattedDate); pollingValue = cts; } - logger.info("EIO_LAST_POLL = {}", pollingValue); + LOGGER.info("EIO_LAST_POLL = {}", pollingValue); String sqlQuery = configuration.getString(SQL_QUERY_VALUE); - if (snapshot.get(PROPERTY_SKIP_NUMBER) != null) + if (snapshot.get(PROPERTY_SKIP_NUMBER) != null) { skipNumber = snapshot.getInt(PROPERTY_SKIP_NUMBER); - logger.info("SQL QUERY {} : ", sqlQuery); + } + LOGGER.info("SQL QUERY {} : ", sqlQuery); ResultSet rs = null; - logger.info("Executing select trigger"); + LOGGER.info("Executing select trigger"); try { QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); sqlQuery = Query.preProcessSelect(sqlQuery); - if(sqlQuery.contains(LAST_POLL_PLACEHOLDER)) { + if (sqlQuery.contains(LAST_POLL_PLACEHOLDER)) { sqlQuery = sqlQuery.replace(LAST_POLL_PLACEHOLDER, "?"); query.selectPolling(sqlQuery, pollingValue); } - logger.info("SQL Query: {}", sqlQuery); - rs = query.executeSelectTrigger(connection, sqlQuery); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - logger.info("Emitting data"); - logger.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } - - if (rowsCount == 0) { - row.add("empty dataset", "no data"); - logger.info("Emitting data"); - logger.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + LOGGER.info("SQL Query: {}", sqlQuery); + ArrayList resultList = query.executeSelectTrigger(connection, sqlQuery); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) - .add(LAST_POLL_PLACEHOLDER, pollingValue.toString()) - .add(SQL_QUERY_VALUE, sqlQuery).build(); - logger.info("Emitting new snapshot {}", snapshot.toString()); + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) + .add(LAST_POLL_PLACEHOLDER, pollingValue.toString()) + .add(SQL_QUERY_VALUE, sqlQuery).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); parameters.getEventEmitter().emitSnapshot(snapshot); } catch (SQLException e) { - logger.error("Failed to make request", e.toString()); + LOGGER.error("Failed to make request", e.toString()); throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - logger.error("Failed to close result set", e.toString()); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - logger.error("Failed to close connection", e.toString()); - } - } } } diff --git a/src/test/groovy/io/elastic/jdbc/QueryColumnNamesProviderSpec.groovy b/src/test/groovy/io/elastic/jdbc/QueryColumnNamesProviderSpec.groovy index b32ae9d..5f4eebd 100644 --- a/src/test/groovy/io/elastic/jdbc/QueryColumnNamesProviderSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/QueryColumnNamesProviderSpec.groovy @@ -4,24 +4,25 @@ import spock.lang.* import javax.json.Json import javax.json.JsonObject +import javax.json.JsonObjectBuilder @Ignore class QueryColumnNamesProviderSpec extends Specification { - JsonObject configuration = Json.createObjectBuilder().build() - String sqlQuery; + JsonObjectBuilder configuration = Json.createObjectBuilder() + String sqlQuery def setup() { sqlQuery = "SELECT * FROM films WHERE watched = @watched:boolean AND created = @created:date" } def "get metadata model, given json object"() { - configuration.addProperty("sqlQuery", sqlQuery) + configuration.add("sqlQuery", sqlQuery) QueryColumnNamesProvider provider = new QueryColumnNamesProvider() - JsonObject meta = provider.getMetaModel((configuration)) + JsonObject meta = provider.getMetaModel(configuration.build()) print meta expect: meta.toString() == "{\"out\":{\"type\":\"object\",\"properties\":{\"watched\":{\"title\":\"watched\",\"type\":\"boolean\"},\"created\":{\"title\":\"created\",\"type\":\"date\"}}},\"in\":{\"type\":\"object\",\"properties\":{\"watched\":{\"title\":\"watched\",\"type\":\"boolean\"},\"created\":{\"title\":\"created\",\"type\":\"date\"}}}}" } -} +} \ No newline at end of file diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy new file mode 100644 index 0000000..3723658 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy @@ -0,0 +1,146 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import org.junit.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class DeleteActionMSSQLSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_MSSQL") + @Shared + def password = System.getenv("CONN_PASSWORD_MSSQL") + @Shared + def databaseName = System.getenv("CONN_DBNAME_MSSQL") + @Shared + def host = System.getenv("CONN_HOST_MSSQL") + @Shared + def port = System.getenv("CONN_PORT_MSSQL") + @Shared + def connectionString = "jdbc:sqlserver://" + host + ":" + port + ";database=" + databaseName + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "mssql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + connection.createStatement().execute("IF OBJECT_ID('stars', 'U') IS NOT NULL\n" + + " DROP TABLE stars;"); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 0, '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 0, '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "IF OBJECT_ID('persons', 'U') IS NOT NULL\n" + + " DROP TABLE persons;" + + connection.createStatement().execute(sql) + sql = "IF OBJECT_ID('stars', 'U') IS NOT NULL\n" + + " DROP TABLE stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } + + +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy new file mode 100644 index 0000000..7fb6b50 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy @@ -0,0 +1,144 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import org.junit.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class DeleteActionMySQLSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_MYSQL") + @Shared + def password = System.getenv("CONN_PASSWORD_MYSQL") + @Shared + def databaseName = System.getenv("CONN_DBNAME_MYSQL") + @Shared + def host = System.getenv("CONN_HOST_MYSQL") + @Shared + def port = System.getenv("CONN_PORT_MYSQL") + @Shared + def dbEngine = "mysql" + @Shared + def connectionString ="jdbc:" + dbEngine + "://" + host + ":" + port + "/" + databaseName + "?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "mysql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 0, '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 0, '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy new file mode 100644 index 0000000..0af8589 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy @@ -0,0 +1,154 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import org.junit.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class DeleteActionOracleSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_ORACLE") + @Shared + def password = System.getenv("CONN_PASSWORD_ORACLE") + @Shared + def databaseName = System.getenv("CONN_DBNAME_ORACLE") + @Shared + def host = System.getenv("CONN_HOST_ORACLE") + @Shared + def port = System.getenv("CONN_PORT_ORACLE") + @Shared + def dbEngine = "oracle" + @Shared + def connectionString ="jdbc:oracle:thin:@//" + host + ":" + port + "/XE" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "STARS") + .add("user", user) + .add("password", password) + .add("dbEngine", "oracle") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id number, name varchar(255) NOT NULL, " + + "radius number, destination float,visible number(1), " + + "CONSTRAINT pk_stars PRIMARY KEY (id))"); + connection.createStatement().execute("INSERT INTO stars (ID,NAME,RADIUS,DESTINATION, VISIBLE) VALUES (1,'Taurus',321,44.4,1)") + connection.createStatement().execute("INSERT INTO stars (ID,NAME,RADIUS,DESTINATION, VISIBLE) VALUES (2,'Boston',581,94.4,0)") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("ID", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("ID", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy new file mode 100644 index 0000000..f65fa67 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy @@ -0,0 +1,144 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import org.junit.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class DeleteActionPostrgeSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_POSTGRESQL") + @Shared + def password = System.getenv("CONN_PASSWORD_POSTGRESQL") + @Shared + def databaseName = System.getenv("CONN_DBNAME_POSTGRESQL") + @Shared + def host = System.getenv("CONN_HOST_POSTGRESQL") + @Shared + def port = System.getenv("CONN_PORT_POSTGRESQL") + @Shared + def dbEngine = "postgresql" + @Shared + def connectionString ="jdbc:postgresql://"+ host + ":" + port + "/" + databaseName + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "postgresql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int, name varchar(255) NOT NULL, " + + "date timestamp, radius int, destination int, visible boolean, visibledate date, PRIMARY KEY(id))"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 'true', '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 'false', '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy similarity index 62% rename from src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy rename to src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy index 6d0e412..51bc280 100644 --- a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy @@ -1,10 +1,12 @@ -package io.elastic.jdbc.actions +package io.elastic.jdbc.actions.upsert import io.elastic.api.EventEmitter import io.elastic.api.ExecutionParameters import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey import spock.lang.* +import javax.json.Json import javax.json.JsonObject import java.sql.Connection import java.sql.DriverManager @@ -13,8 +15,6 @@ import java.sql.ResultSet @Ignore class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { - @Shared - def connectionString = System.getenv("CONN_URI_MSSQL") @Shared def user = System.getenv("CONN_USER_MSSQL") @Shared @@ -23,7 +23,10 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { def databaseName = System.getenv("CONN_DBNAME_MSSQL") @Shared def host = System.getenv("CONN_HOST_MSSQL") - + @Shared + def port = System.getenv("CONN_PORT_MSSQL") + @Shared + def connectionString ="jdbc:sqlserver://" + host + ":" + port + ";database=" + databaseName @Shared Connection connection @@ -36,9 +39,11 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { @Shared EventEmitter.Callback reboundCallback @Shared + EventEmitter.Callback httpReplyCallback + @Shared EventEmitter emitter @Shared - LookupRowByPrimaryKey action + UpsertRowByPrimaryKey action def setupSpec() { connection = DriverManager.getConnection(connectionString, user, password) @@ -53,26 +58,28 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { snapshotCallback = Mock(EventEmitter.Callback) dataCallback = Mock(EventEmitter.Callback) reboundCallback = Mock(EventEmitter.Callback) - emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback).onRebound(reboundCallback).build() - action = new LookupRowByPrimaryKey(emitter) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new UpsertRowByPrimaryKey() } def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { Message msg = new Message.Builder().body(body).build() - ExecutionParameters params = new ExecutionParameters(msg, config, snapshot) + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) action.execute(params); } def getStarsConfig() { - JsonObject config = Json.createObjectBuilder().build() - - config.addProperty("idColumn", "id") - config.addProperty("tableName", "stars") - config.addProperty("user", user) - config.addProperty("password", password) - config.addProperty("dbEngine", "mssql") - config.addProperty("host", host) - config.addProperty("databaseName", databaseName) + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "mssql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build(); return config; } @@ -80,7 +87,8 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { String sql = "IF OBJECT_ID('stars', 'U') IS NOT NULL\n" + " DROP TABLE stars;" connection.createStatement().execute(sql); - connection.createStatement().execute("CREATE TABLE stars (id int, name varchar(255) NOT NULL, date datetime, radius int, destination int)"); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); } def getRecords(tableName) { @@ -111,11 +119,14 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body = Json.createObjectBuilder().build() - body.addProperty("id", "1") - body.addProperty("name", "Taurus") - body.addProperty("date", "2015-02-19 10:10:10.0") - body.addProperty("radius", "123") + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("date", "2015-02-19 10:10:10.0") + .add("radius", 123) + .add("visible", true) + .add("visibledate", "2015-02-19") + .build(); runAction(getStarsConfig(), body, snapshot) @@ -123,7 +134,8 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { expect: records.size() == 1 - records.get(0) == '{id=1, name=Taurus, date=2015-02-19 10:10:10.0, radius=123, destination=null}' + records.get(0) == '{id=1, name=Taurus, date=2015-02-19 10:10:10.0, radius=123, destination=null, visible=true, ' + + 'visibledate=2015-02-19}' } def "one insert, incorrect value: string in integer field"() { @@ -132,11 +144,11 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body = Json.createObjectBuilder().build() - body.addProperty("id", "1") - body.addProperty("name", "Taurus") - body.addProperty("radius", "test") - + JsonObject body = Json.createObjectBuilder() + .add("id", "1") + .add("name", "Taurus") + .add("radius", "test") + .build() String exceptionClass = ""; try { @@ -155,17 +167,18 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body1 = Json.createObjectBuilder().build() - body1.addProperty("id", "1") - body1.addProperty("name", "Taurus") - body1.addProperty("radius", "123") - + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() runAction(getStarsConfig(), body1, snapshot) - JsonObject body2 = Json.createObjectBuilder().build() - body2.addProperty("id", "2") - body2.addProperty("name", "Eridanus") - body2.addProperty("radius", "456") + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "Eridanus") + .add("radius", 456) + .build() runAction(getStarsConfig(), body2, snapshot) @@ -173,8 +186,8 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { expect: records.size() == 2 - records.get(0) == '{id=1, name=Taurus, date=null, radius=123, destination=null}' - records.get(1) == '{id=2, name=Eridanus, date=null, radius=456, destination=null}' + records.get(0) == '{id=1, name=Taurus, date=null, radius=123, destination=null, visible=null, visibledate=null}' + records.get(1) == '{id=2, name=Eridanus, date=null, radius=456, destination=null, visible=null, visibledate=null}' } def "one insert, one update by ID"() { @@ -183,44 +196,46 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body1 = Json.createObjectBuilder().build() - body1.addProperty("id", "1") - body1.addProperty("name", "Taurus") - body1.addProperty("radius", "123") - + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() runAction(getStarsConfig(), body1, snapshot) - JsonObject body2 = Json.createObjectBuilder().build() - body2.addProperty("id", "1") - body2.addProperty("name", "Eridanus") - + JsonObject body2 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Eridanus") + .build() runAction(getStarsConfig(), body2, snapshot) ArrayList records = getRecords("stars") expect: records.size() == 1 - records.get(0) == '{id=1, name=Eridanus, date=null, radius=123, destination=null}' + records.get(0) == '{id=1, name=Eridanus, date=null, radius=123, destination=null, visible=null, visibledate=null}' } def getPersonsConfig() { - JsonObject config = Json.createObjectBuilder().build() - config.addProperty("idColumn", "email") - config.addProperty("tableName", "persons") - config.addProperty("user", user) - config.addProperty("password", password) - config.addProperty("dbEngine", "mssql") - config.addProperty("host", host) - config.addProperty("databaseName", databaseName) - return config; + JsonObject config = Json.createObjectBuilder() + .add("tableName", "persons") + .add("user", user) + .add("password", password) + .add("dbEngine", "mssql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build() + return config } def preparePersonsTable() { String sql = "IF OBJECT_ID('persons', 'U') IS NOT NULL\n" + " DROP TABLE persons;" connection.createStatement().execute(sql); - connection.createStatement().execute("CREATE TABLE persons (id int, name varchar(255) NOT NULL, email varchar(255) NOT NULL)"); + connection.createStatement().execute("CREATE TABLE persons (id int, name varchar(255) NOT NULL, " + + "email varchar(255) NOT NULL PRIMARY KEY)"); } def "one insert, name with quote"() { @@ -229,10 +244,11 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body1 = Json.createObjectBuilder().build() - body1.addProperty("id", "1") - body1.addProperty("name", "O'Henry") - body1.addProperty("email", "ohenry@elastic.io") + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "O'Henry") + .add("email", "ohenry@elastic.io") + .build() runAction(getPersonsConfig(), body1, snapshot) ArrayList records = getRecords("persons") @@ -248,22 +264,25 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { JsonObject snapshot = Json.createObjectBuilder().build() - JsonObject body1 = Json.createObjectBuilder().build() - body1.addProperty("id", "1") - body1.addProperty("name", "User1") - body1.addProperty("email", "user1@elastic.io") + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "User1") + .add("email", "user1@elastic.io") + .build() runAction(getPersonsConfig(), body1, snapshot) - JsonObject body2 = Json.createObjectBuilder().build() - body2.addProperty("id", "2") - body2.addProperty("name", "User2") - body2.addProperty("email", "user2@elastic.io") + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "User2") + .add("email", "user2@elastic.io") + .build() runAction(getPersonsConfig(), body2, snapshot) - JsonObject body3 = Json.createObjectBuilder().build() - body3.addProperty("id", "3") - body3.addProperty("name", "User3") - body3.addProperty("email", "user2@elastic.io") + JsonObject body3 = Json.createObjectBuilder() + .add("id", 3) + .add("name", "User3") + .add("email", "user2@elastic.io") + .build() runAction(getPersonsConfig(), body3, snapshot) ArrayList records = getRecords("persons") @@ -273,6 +292,4 @@ class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { records.get(0) == '{id=1, name=User1, email=user1@elastic.io}' records.get(1) == '{id=3, name=User3, email=user2@elastic.io}' } - - } diff --git a/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy new file mode 100644 index 0000000..115b3f0 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy @@ -0,0 +1,295 @@ +package io.elastic.jdbc.actions.upsert + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey +import spock.lang.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class UpsertRowByPrimaryKeyMySQLSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_MYSQL") + @Shared + def password = System.getenv("CONN_PASSWORD_MYSQL") + @Shared + def databaseName = System.getenv("CONN_DBNAME_MYSQL") + @Shared + def host = System.getenv("CONN_HOST_MYSQL") + @Shared + def port = System.getenv("CONN_PORT_MYSQL") + @Shared + def dbEngine = "mysql" + @Shared + def connectionString ="jdbc:" + dbEngine + "://" + host + ":" + port + "/" + databaseName + "?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + UpsertRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new UpsertRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build(); + return config; + } + + def prepareStarsTable() { + + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one insert"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("date", "2015-02-19 10:10:10.0") + .add("radius", 123) + .add("visible", true) + .build(); + + runAction(getStarsConfig(), body, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=Taurus, date=2015-02-19 10:10:10.0, radius=123, destination=null, visible=true, ' + + 'visibledate=null}' + } + + def "one insert, incorrect value: string in integer field"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", "1") + .add("name", "Taurus") + .add("radius", "test") + .build() + String exceptionClass = ""; + + try { + runAction(getStarsConfig(), body, snapshot) + } catch (Exception e) { + exceptionClass = e.getClass().getName(); + } + + expect: + exceptionClass.contains("Exception") + } + + def "two inserts"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "Eridanus") + .add("radius", 456) + .build() + + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 2 + records.get(0) == '{id=1, name=Taurus, date=null, radius=123, destination=null, visible=null, visibledate=null}' + records.get(1) == '{id=2, name=Eridanus, date=null, radius=456, destination=null, visible=null, visibledate=null}' + } + + def "one insert, one update by ID"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Eridanus") + .build() + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=Eridanus, date=null, radius=123, destination=null, visible=null, visibledate=null}' + } + + + def getPersonsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "persons") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build() + return config + } + + def preparePersonsTable() { + String sql = "DROP TABLE IF EXISTS persons;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE persons (id int, name varchar(255) NOT NULL, " + + "email varchar(255) NOT NULL PRIMARY KEY)"); + } + + def "one insert, name with quote"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "O'Henry") + .add("email", "ohenry@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=O\'Henry, email=ohenry@elastic.io}' + } + + def "two inserts, one update by email"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "User1") + .add("email", "user1@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "User2") + .add("email", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body2, snapshot) + + JsonObject body3 = Json.createObjectBuilder() + .add("id", 3) + .add("name", "User3") + .add("email", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body3, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 2 + records.get(0) == '{id=1, name=User1, email=user1@elastic.io}' + records.get(1) == '{id=3, name=User3, email=user2@elastic.io}' + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy new file mode 100644 index 0000000..182113a --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy @@ -0,0 +1,322 @@ +package io.elastic.jdbc.actions.upsert + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey +import spock.lang.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class UpsertRowByPrimaryKeyOracleSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_ORACLE") + @Shared + def password = System.getenv("CONN_PASSWORD_ORACLE") + @Shared + def databaseName = System.getenv("CONN_DBNAME_ORACLE") + @Shared + def host = System.getenv("CONN_HOST_ORACLE") + @Shared + def port = System.getenv("CONN_PORT_ORACLE") + @Shared + def dbEngine = "oracle" + @Shared + def connectionString ="jdbc:oracle:thin:@//" + host + ":" + port + "/XE" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + UpsertRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new UpsertRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "STARS") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id number, name varchar(255) NOT NULL, " + + "ndate timestamp, radius number, destination float,visible number(1), " + + "CONSTRAINT pk_stars PRIMARY KEY (id))"); + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE persons';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + + connection.createStatement().execute(sql) + sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one insert"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("ID", 1) + .add("name", "Taurus") + .add("ndate", "2015-02-19 10:10:10") + .add("radius", 123) + .add("visible", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{ID=1, NAME=Taurus, NDATE=2015-02-19 10:10:10.0, RADIUS=123, DESTINATION=null, VISIBLE=1}' + } + + def "one insert, incorrect value: string in integer field"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("ID", 1) + .add("name", "Taurus") + .add("radius", "test") + .build() + String exceptionClass = ""; + + try { + runAction(getStarsConfig(), body, snapshot) + } catch (Exception e) { + exceptionClass = e.getClass().getName(); + } + + expect: + exceptionClass.contains("Exception") + } + + def "two inserts"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("ID", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("ID", 2) + .add("name", "Eridanus") + .add("radius", 456) + .build() + + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 2 + records.get(0) == '{ID=1, NAME=Taurus, NDATE=null, RADIUS=123, DESTINATION=null, VISIBLE=null}' + records.get(1) == '{ID=2, NAME=Eridanus, NDATE=null, RADIUS=456, DESTINATION=null, VISIBLE=null}' + } + + def "one insert, one update by ID"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("ID", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("ID", 1) + .add("name", "Eridanus") + .build() + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{ID=1, NAME=Eridanus, NDATE=null, RADIUS=123, DESTINATION=null, VISIBLE=null}' + } + + + def getPersonsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "PERSONS") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build() + return config + } + + def preparePersonsTable() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE persons';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE persons (id int, name varchar(255) NOT NULL, " + + "EMAIL varchar(255) NOT NULL, CONSTRAINT pk_persons PRIMARY KEY (EMAIL))"); + } + + def "one insert, name with quote"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "O'Henry") + .add("EMAIL", "ohenry@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 1 + records.get(0) == '{ID=1, NAME=O\'Henry, EMAIL=ohenry@elastic.io}' + } + + def "two inserts, one update by email"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "User1") + .add("EMAIL", "user1@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "User2") + .add("EMAIL", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body2, snapshot) + + JsonObject body3 = Json.createObjectBuilder() + .add("id", 3) + .add("name", "User3") + .add("EMAIL", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body3, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 2 + records.get(0) == '{ID=1, NAME=User1, EMAIL=user1@elastic.io}' + records.get(1) == '{ID=3, NAME=User3, EMAIL=user2@elastic.io}' + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy new file mode 100644 index 0000000..829be62 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy @@ -0,0 +1,295 @@ +package io.elastic.jdbc.actions.upsert + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey +import spock.lang.Ignore +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +@Ignore +class UpsertRowByPrimaryKeyPostgreSpec extends Specification { + + @Shared + def user = System.getenv("CONN_USER_POSTGRESQL") + @Shared + def password = System.getenv("CONN_PASSWORD_POSTGRESQL") + @Shared + def databaseName = System.getenv("CONN_DBNAME_POSTGRESQL") + @Shared + def host = System.getenv("CONN_HOST_POSTGRESQL") + @Shared + def port = System.getenv("CONN_PORT_POSTGRESQL") + @Shared + def dbEngine = "postgresql" + @Shared + def connectionString ="jdbc:postgresql://"+ host + ":" + port + "/" + databaseName + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + UpsertRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new UpsertRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int, name varchar(255) NOT NULL, " + + "date timestamp, radius int, destination int, visible boolean, visibledate date, PRIMARY KEY(id))"); + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one insert"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("date", "2015-02-19 10:10:10") + .add("radius", 123) + .add("visible", true) + .add("visibledate", "2015-02-19") + .build(); + + runAction(getStarsConfig(), body, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=Taurus, date=2015-02-19 10:10:10.0, radius=123, destination=null, visible=true, ' + + 'visibledate=2015-02-19}' + } + + def "one insert, incorrect value: string in integer field"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", "test") + .build() + String exceptionClass = ""; + + try { + runAction(getStarsConfig(), body, snapshot) + } catch (Exception e) { + exceptionClass = e.getClass().getName(); + } + + expect: + exceptionClass.contains("Exception") + } + + def "two inserts"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "Eridanus") + .add("radius", 456) + .build() + + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 2 + records.get(0) == '{id=1, name=Taurus, date=null, radius=123, destination=null, visible=null, visibledate=null}' + records.get(1) == '{id=2, name=Eridanus, date=null, radius=456, destination=null, visible=null, visibledate=null}' + } + + def "one insert, one update by ID"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Taurus") + .add("radius", 123) + .build() + runAction(getStarsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "Eridanus") + .build() + runAction(getStarsConfig(), body2, snapshot) + + ArrayList records = getRecords("stars") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=Eridanus, date=null, radius=123, destination=null, visible=null, visibledate=null}' + } + + + def getPersonsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "persons") + .add("user", user) + .add("password", password) + .add("dbEngine", dbEngine) + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .build() + return config + } + + def preparePersonsTable() { + String sql = "DROP TABLE IF EXISTS persons;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE persons (id int, name varchar(255) NOT NULL, " + + "email varchar(255) NOT NULL, PRIMARY KEY(email))"); + } + + def "one insert, name with quote"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "O'Henry") + .add("email", "ohenry@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 1 + records.get(0) == '{id=1, name=O\'Henry, email=ohenry@elastic.io}' + } + + def "two inserts, one update by email"() { + + preparePersonsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body1 = Json.createObjectBuilder() + .add("id", 1) + .add("name", "User1") + .add("email", "user1@elastic.io") + .build() + runAction(getPersonsConfig(), body1, snapshot) + + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .add("name", "User2") + .add("email", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body2, snapshot) + + JsonObject body3 = Json.createObjectBuilder() + .add("id", 3) + .add("name", "User3") + .add("email", "user2@elastic.io") + .build() + runAction(getPersonsConfig(), body3, snapshot) + + ArrayList records = getRecords("persons") + + expect: + records.size() == 2 + records.get(0) == '{id=1, name=User1, email=user1@elastic.io}' + records.get(1) == '{id=3, name=User3, email=user2@elastic.io}' + } +}