diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java index ecd44f4..61dcc89 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java @@ -4,35 +4,19 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class MSSQL extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "WITH Results_CTE AS" + "(" + @@ -46,14 +30,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { " FROM Results_CTE" + " WHERE RowNum > ?" + " AND RowNum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber + skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "WITH Results_CTE AS" + "(" + @@ -67,13 +74,7 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ " FROM Results_CTE" + " WHERE RowNum > ?" + " AND RowNum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -81,9 +82,10 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { @@ -91,17 +93,7 @@ public boolean executeRecordExists(Connection connection, JsonObject body) throw String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - try { - Utils.setStatementParam(stmt, 1, lookupField, body); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; - } finally { - if (stmt != null) { - stmt.close(); - } - } + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -122,18 +114,37 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - try { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { int i = 1; for (Map.Entry entry : body.entrySet()) { Utils.setStatementParam(stmt, i, entry.getKey(), body); i++; } stmt.execute(); - } finally { - if (stmt != null) { - stmt.close(); + } + } + + public void executeUpdate(Connection connection, String tableName, String idColumn, + String idValue, JsonObject body) throws SQLException { + validateQuery(); + StringBuilder setString = new StringBuilder(); + for (Map.Entry entry : body.entrySet()) { + if (setString.length() > 0) { + setString.append(","); } + setString.append(entry.getKey()).append(" = ?"); + } + String sql = "UPDATE " + tableName + + " SET " + setString.toString() + + " WHERE " + idColumn + " = ?"; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } } @@ -168,55 +179,20 @@ public void executeUpsert(Connection connection, String idColumn, JsonObject bod " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")" + " COMMIT;"; - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(sql); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { Utils.setStatementParam(stmt, 1, idColumn, body); int i = 2; for (Map.Entry entry : body.entrySet()) { Utils.setStatementParam(stmt, i, entry.getKey(), body); i++; } - Utils.setStatementParam(stmt, i, idColumn, body); - i++; + Utils.setStatementParam(stmt, i++, idColumn, body); for (Map.Entry entry : body.entrySet()) { Utils.setStatementParam(stmt, i, entry.getKey(), body); i++; } stmt.execute(); - } finally { - if (stmt != null) { - stmt.close(); - } } } - public void executeUpdate(Connection connection, String tableName, String idColumn, - String idValue, JsonObject body) throws SQLException { - validateQuery(); - StringBuilder setString = new StringBuilder(); - for (Map.Entry entry : body.entrySet()) { - if (setString.length() > 0) { - setString.append(","); - } - setString.append(entry.getKey()).append(" = ?"); - } - String sql = "UPDATE " + tableName + - " SET " + setString.toString() + - " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - try { - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; - } - Utils.setStatementParam(stmt, i, idColumn, body); - stmt.execute(); - } finally { - if (stmt != null) { - stmt.close(); - } - } - } } diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java index 0eef058..7f7ea47 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java @@ -4,37 +4,18 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class MySQL extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - StringBuilder sql = new StringBuilder(sqlQuery); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), (entry.getValue() != null) ? body : null); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - StringBuilder sql = new StringBuilder(sqlQuery); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); StringBuilder sql = new StringBuilder("SELECT * FROM "); sql.append(tableName); @@ -46,14 +27,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { } sql.append(" ASC LIMIT ? OFFSET ?"); - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, countNumber); - stmt.setInt(3, skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql.toString())) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, countNumber); + stmt.setInt(3, skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); StringBuilder sql = new StringBuilder("SELECT * FROM "); sql.append(tableName); @@ -62,23 +66,17 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ sql.append(" = ?"); sql.append(" ORDER BY ").append(lookupField); sql.append(" ASC LIMIT ? OFFSET ?"); - - PreparedStatement stmt = connection.prepareStatement(sql.toString()); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); - } - stmt.setInt(2, countNumber); - stmt.setInt(3, skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql.toString(), countNumber, skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { @@ -86,11 +84,7 @@ public boolean executeRecordExists(Connection connection, JsonObject body) throw String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - Utils.setStatementParam(stmt, 1, lookupField, body); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -111,13 +105,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (String key : body.keySet()) { - Utils.setStatementParam(stmt, i, key, body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (String key : body.keySet()) { + Utils.setStatementParam(stmt, i, key, body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -133,14 +128,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (String key : body.keySet()) { - Utils.setStatementParam(stmt, i, key, body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (String key : body.keySet()) { + Utils.setStatementParam(stmt, i, key, body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, body); - stmt.execute(); } public void executeUpsert(Connection connection, String idColumn, JsonObject body) @@ -167,23 +163,15 @@ public void executeUpsert(Connection connection, String idColumn, JsonObject bod " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")" + " ON DUPLICATE KEY UPDATE " + setString + ";"; - PreparedStatement stmt = null; - try { - stmt = connection.prepareStatement(sql); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { int i = 1; + int countBodyEntry = body.size(); for (Map.Entry entry : body.entrySet()) { Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; - } - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); + Utils.setStatementParam(stmt, i + countBodyEntry, entry.getKey(), body); i++; } stmt.execute(); - } finally { - if (stmt != null) { - stmt.close(); - } } } diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java index 6126bcd..dcb7807 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java @@ -4,73 +4,73 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class Oracle extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "SELECT * FROM " + " (SELECT b.*, rank() over (order by " + pollingField + ") as rnk FROM " + tableName + " b) WHERE " + pollingField + " > ?" + " AND rnk BETWEEN ? AND ?" + " ORDER BY " + pollingField; - PreparedStatement stmt = connection.prepareStatement(sql); - - /* data types mapping https://docs.oracle.com/cd/B19306_01/java.102/b14188/datamap.htm */ - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + /* data types mapping https://docs.oracle.com/cd/B19306_01/java.102/b14188/datamap.htm */ + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "SELECT * FROM " + "(SELECT b.*, rank() OVER (ORDER BY " + lookupField + ") AS rnk FROM " + tableName + " b) WHERE " + lookupField + " = ? " + "AND rnk BETWEEN ? AND ? " + "ORDER BY " + lookupField; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setString(1, lookupValue); - return stmt.executeUpdate(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, lookupValue); + return stmt.executeUpdate(); + } } public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { @@ -78,11 +78,7 @@ public boolean executeRecordExists(Connection connection, JsonObject body) throw String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - Utils.setStatementParam(stmt, 1, lookupField, body); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -103,13 +99,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -125,14 +122,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, body); - stmt.execute(); } public void executeUpsert(Connection connection, String idColumn, JsonObject body) diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java index 41f1a26..9b07e42 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java @@ -4,35 +4,18 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Map; -import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class PostgreSQL extends Query { - public ResultSet executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - int i = 1; - for (Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; - } - return stmt.executeQuery(); - } - - public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException { - PreparedStatement stmt = connection.prepareStatement(sqlQuery); - if (pollingValue != null) { - stmt.setTimestamp(1, pollingValue); - } - return stmt.executeQuery(); - } - - public ResultSet executePolling(Connection connection) throws SQLException { + public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); String sql = "WITH results_cte AS" + "(" + @@ -46,14 +29,37 @@ public ResultSet executePolling(Connection connection) throws SQLException { " FROM results_cte" + " WHERE rownum > ?" + " AND rownum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, skipNumber); + stmt.setInt(3, countNumber + skipNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } } - public ResultSet executeLookup(Connection connection, JsonObject body) throws SQLException { + public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { validateQuery(); String sql = "WITH results_cte AS" + "(" + @@ -67,14 +73,7 @@ public ResultSet executeLookup(Connection connection, JsonObject body) throws SQ " FROM results_cte" + " WHERE rownum > ?" + " AND rownum < ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - //stmt.setString(1, lookupValue); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); - } - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - return stmt.executeQuery(); + return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -82,11 +81,12 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep String sql = "DELETE" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, 1, entry.getKey(), body); + } + return stmt.executeUpdate(); } - return stmt.executeUpdate(); } public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { @@ -94,11 +94,7 @@ public boolean executeRecordExists(Connection connection, JsonObject body) throw String sql = "SELECT COUNT(*)" + " FROM " + tableName + " WHERE " + lookupField + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - Utils.setStatementParam(stmt, 1, lookupField, body); - ResultSet rs = stmt.executeQuery(); - rs.next(); - return rs.getInt(1) > 0; + return Utils.isRecordExists(connection, body, sql, lookupField); } public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -119,13 +115,14 @@ public void executeInsert(Connection connection, String tableName, JsonObject bo String sql = "INSERT INTO " + tableName + " (" + keys.toString() + ")" + " VALUES (" + values.toString() + ")"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + stmt.execute(); } - stmt.execute(); } public void executeUpdate(Connection connection, String tableName, String idColumn, @@ -141,14 +138,15 @@ public void executeUpdate(Connection connection, String tableName, String idColu String sql = "UPDATE " + tableName + " SET " + setString.toString() + " WHERE " + idColumn + " = ?"; - PreparedStatement stmt = connection.prepareStatement(sql); - int i = 1; - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, i, entry.getKey(), body); - i++; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + int i = 1; + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + Utils.setStatementParam(stmt, i, idColumn, body); + stmt.execute(); } - Utils.setStatementParam(stmt, i, idColumn, body); - stmt.execute(); } public void executeUpsert(Connection connection, String idColumn, JsonObject body) diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java index c58daaf..84b8b84 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java @@ -2,10 +2,17 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Map.Entry; +import javax.json.Json; import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; public abstract class Query { @@ -15,6 +22,7 @@ public abstract class Query { protected String orderField = null; protected String pollingField = null; protected Timestamp pollingValue = null; + protected Timestamp maxPollingValue = null; protected String lookupField = null; protected String lookupValue = null; @@ -63,9 +71,17 @@ public Query selectPolling(String sqlQuery, Timestamp fieldValue) { return this; } - abstract public ResultSet executePolling(Connection connection) throws SQLException; + public Timestamp getMaxPollingValue() { + return maxPollingValue; + } + + public void setMaxPollingValue(Timestamp maxPollingValue) { + this.maxPollingValue = maxPollingValue; + } - abstract public ResultSet executeLookup(Connection connection, JsonObject body) + abstract public ArrayList executePolling(Connection connection) throws SQLException; + + abstract public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException; abstract public boolean executeRecordExists(Connection connection, JsonObject body) @@ -82,11 +98,49 @@ abstract public void executeUpdate(Connection connection, String tableName, Stri abstract public void executeUpsert(Connection connection, String idColumn, JsonObject body) throws SQLException; - abstract public ResultSet executeSelectQuery(Connection connection, String sqlQuery, - JsonObject body) throws SQLException; + public ArrayList executeSelectTrigger(Connection connection, String sqlQuery) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sqlQuery)) { + if (pollingValue != null) { + stmt.setTimestamp(1, pollingValue); + } + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + listResult.add(row.build()); + } + return listResult; + } + } + } - abstract public ResultSet executeSelectTrigger(Connection connection, String sqlQuery) - throws SQLException; + public ArrayList executeSelectQuery(Connection connection, String sqlQuery, JsonObject body) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sqlQuery)) { + int i = 1; + for (Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, i, entry.getKey(), body); + i++; + } + try (ResultSet rs = stmt.executeQuery()) { + JsonObjectBuilder row = Json.createObjectBuilder(); + ArrayList listResult = new ArrayList(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + listResult.add(row.build()); + } + return listResult; + } + } + } public void validateQuery() { if (tableName == null) { diff --git a/src/main/java/io/elastic/jdbc/Utils.java b/src/main/java/io/elastic/jdbc/Utils.java index 740715e..b9e313e 100644 --- a/src/main/java/io/elastic/jdbc/Utils.java +++ b/src/main/java/io/elastic/jdbc/Utils.java @@ -17,6 +17,7 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; @@ -300,4 +301,42 @@ public static JsonObjectBuilder getColumnDataByType(ResultSet rs, ResultSetMetaD return row; } + public static JsonObject getLookupRow(Connection connection, JsonObject body, String sql, + Integer secondParameter, Integer thirdParameter) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + JsonObjectBuilder row = Json.createObjectBuilder(); + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, 1, entry.getKey(), body); + } + stmt.setInt(2, secondParameter); + stmt.setInt(3, thirdParameter); + try (ResultSet rs = stmt.executeQuery()) { + ResultSetMetaData metaData = rs.getMetaData(); + int rowsCount = 0; + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + rowsCount++; + if (rowsCount > 1) { + LOGGER.error("Error: the number of matching rows is not exactly one"); + throw new RuntimeException("Error: the number of matching rows is not exactly one"); + } + } + return row.build(); + } + } + } + + public static boolean isRecordExists (Connection connection, JsonObject body, String sql, String lookupField) throws SQLException{ + try (PreparedStatement stmt = connection.prepareStatement(sql)){ + Utils.setStatementParam(stmt, 1, lookupField, body); + try (ResultSet rs = stmt.executeQuery()) { + rs.next(); + return rs.getInt(1) > 0; + } + } + } + } diff --git a/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java b/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java index db9853d..e29b0b0 100644 --- a/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java +++ b/src/main/java/io/elastic/jdbc/actions/DeleteRowByPrimaryKey.java @@ -8,12 +8,10 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import javax.json.JsonValue; import org.slf4j.Logger; @@ -27,24 +25,18 @@ public class DeleteRowByPrimaryKey implements Module { private static final String PROPERTY_ID_COLUMN = "idColumn"; private static final String PROPERTY_LOOKUP_VALUE = "lookupValue"; private static final String PROPERTY_NULLABLE_RESULT = "nullableResult"; - public boolean isOracle = false; - private Connection connection = null; - private Map columnTypes = null; @Override public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSet rs = null; StringBuilder primaryKey = new StringBuilder(); StringBuilder primaryValue = new StringBuilder(); Integer primaryKeysCount = 0; String tableName = ""; String dbEngine = ""; Boolean nullableResult = false; - Integer rowsCount = 0; if (configuration.containsKey(PROPERTY_TABLE_NAME) && Utils.getNonNullString(configuration, PROPERTY_TABLE_NAME).length() != 0) { @@ -70,8 +62,6 @@ public void execute(ExecutionParameters parameters) { nullableResult = true; } - boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); - for (Map.Entry entry : body.entrySet()) { LOGGER.info("{} = {}", entry.getKey(), entry.getValue()); primaryKey.append(entry.getKey()); @@ -80,73 +70,55 @@ public void execute(ExecutionParameters parameters) { } if (primaryKeysCount == 1) { - LOGGER.info("Executing delete row by primary key action"); - Connection connection = Utils.getConnection(configuration); - Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); - LOGGER.info("Detected column types: " + Utils.columnTypes); - try { - QueryFactory queryFactory = new QueryFactory(); - Query query = queryFactory.getQuery(dbEngine); - LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); - query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); - checkConfig(configuration); - rs = query.executeLookup(connection, body); - while (rs.next()) { - rowsCount++; - if (rowsCount > 1) { - LOGGER.error("Error: the number of matching rows is not exactly one"); - throw new RuntimeException("Error: the number of matching rows is not exactly one"); + try (Connection connection = Utils.getConnection(configuration)) { + LOGGER.info("Executing delete row by primary key action"); + boolean isOracle = dbEngine.equals(Engines.ORACLE.name().toLowerCase()); + Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); + LOGGER.info("Detected column types: " + Utils.columnTypes); + try { + QueryFactory queryFactory = new QueryFactory(); + Query query = queryFactory.getQuery(dbEngine); + LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); + query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); + checkConfig(configuration); + JsonObject row = query.executeLookup(connection, body); + + for (Map.Entry entry : configuration.entrySet()) { + LOGGER.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); } - } - - for (Map.Entry entry : configuration.entrySet()) { - LOGGER.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); - } - - if (rowsCount == 1) { - int result = query.executeDelete(connection, body); - if (result == 1) { - row.add("result", result); - LOGGER.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else { - LOGGER.info("Unexpected result"); - throw new RuntimeException("Unexpected result"); + if (row.size() != 0) { + int result = query.executeDelete(connection, body); + if (result == 1) { + LOGGER.info("Emitting data {}", row); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); + } else { + LOGGER.info("Unexpected result"); + throw new RuntimeException("Unexpected result"); + } + } else if (row.size() == 0 && nullableResult) { + JsonObject emptyRow = Json.createObjectBuilder() + .add("empty dataset", "nothing to delete") + .build(); + LOGGER.info("Emitting data {}", emptyRow); + parameters.getEventEmitter().emitData(new Message.Builder().body(emptyRow).build()); + } else if (row.size() == 0 && !nullableResult) { + LOGGER.info("Empty response. Error message will be returned"); + throw new RuntimeException("Empty response"); } - } else if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "nothing to delete"); - LOGGER.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { - LOGGER.info("Empty response. Error message will be returned"); - throw new RuntimeException("Empty response"); - } - snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_ID_COLUMN, primaryKey.toString()) - .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) - .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); - LOGGER.info("Emitting new snapshot {}", snapshot.toString()); - parameters.getEventEmitter().emitSnapshot(snapshot); - } catch (SQLException e) { - LOGGER.error("Failed to make request", e.toString()); - throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close result set", e.toString()); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close connection", e.toString()); - } + snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_ID_COLUMN, primaryKey.toString()) + .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) + .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } catch (SQLException e) { + LOGGER.error("Failed to make request", e.toString()); + throw new RuntimeException(e); } + } catch (SQLException e) { + LOGGER.error("Failed to close connection", e.toString()); } } else { LOGGER.error("Error: Should be one Primary Key"); diff --git a/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java b/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java index 0d77ac2..de40b44 100644 --- a/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java +++ b/src/main/java/io/elastic/jdbc/actions/LookupRowByPrimaryKey.java @@ -8,13 +8,10 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.Map; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import javax.json.JsonValue; import org.slf4j.Logger; @@ -34,15 +31,12 @@ public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSet rs = null; StringBuilder primaryKey = new StringBuilder(); StringBuilder primaryValue = new StringBuilder(); Integer primaryKeysCount = 0; String tableName = ""; String dbEngine = ""; Boolean nullableResult = false; - Integer rowsCount = 0; if (configuration.containsKey(PROPERTY_TABLE_NAME) && Utils.getNonNullString(configuration, PROPERTY_TABLE_NAME).length() != 0) { @@ -78,71 +72,46 @@ public void execute(ExecutionParameters parameters) { } if (primaryKeysCount == 1) { - LOGGER.info("Executing lookup row by primary key action"); - Connection connection = Utils.getConnection(configuration); - Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); - LOGGER.info("Detected column types: " + Utils.columnTypes); - try { - QueryFactory queryFactory = new QueryFactory(); - Query query = queryFactory.getQuery(dbEngine); - LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); - query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); - checkConfig(configuration); - rs = query.executeLookup(connection, body); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); + + try (Connection connection = Utils.getConnection(configuration)) { + LOGGER.info("Executing lookup row by primary key action"); + Utils.columnTypes = Utils.getColumnTypes(connection, isOracle, tableName); + LOGGER.info("Detected column types: " + Utils.columnTypes); + try { + QueryFactory queryFactory = new QueryFactory(); + Query query = queryFactory.getQuery(dbEngine); + LOGGER.info("Lookup parameters: {} = {}", primaryKey.toString(), primaryValue.toString()); + query.from(tableName).lookup(primaryKey.toString(), primaryValue.toString()); + checkConfig(configuration); + + JsonObject row = query.executeLookup(connection, body); + if (row.size() != 0) { + LOGGER.info("Emitting data"); + LOGGER.info(row.toString()); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); } - rowsCount++; - if (rowsCount > 1) { - LOGGER.error("Error: the number of matching rows is not exactly one"); - throw new RuntimeException("Error: the number of matching rows is not exactly one"); - } else { + if (row.size() == 0 && nullableResult) { + row.put("empty dataset", null); LOGGER.info("Emitting data"); LOGGER.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + parameters.getEventEmitter().emitData(new Message.Builder().body(row).build()); + } else if (row.size() == 0 && !nullableResult) { + LOGGER.info("Empty response. Error message will be returned"); + throw new RuntimeException("Empty response"); } - } - - for (Map.Entry entry : configuration.entrySet()) { - LOGGER.info("Key = " + entry.getKey() + " Value = " + entry.getValue()); - } - if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "no data"); - LOGGER.info("Emitting data"); - LOGGER.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { - LOGGER.info("Empty response. Error message will be returned"); - throw new RuntimeException("Empty response"); + snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_ID_COLUMN, primaryKey.toString()) + .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) + .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } catch (SQLException e) { + LOGGER.error("Failed to make request", e.toString()); + throw new RuntimeException(e); } - - snapshot = Json.createObjectBuilder().add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_ID_COLUMN, primaryKey.toString()) - .add(PROPERTY_LOOKUP_VALUE, primaryValue.toString()) - .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); - LOGGER.info("Emitting new snapshot {}", snapshot.toString()); - parameters.getEventEmitter().emitSnapshot(snapshot); } catch (SQLException e) { - LOGGER.error("Failed to make request", e.toString()); - throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close result set", e.toString()); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close connection", e.toString()); - } - } + LOGGER.error("Failed to close connection", e.toString()); } } else { LOGGER.error("Error: Should be one Primary Key"); diff --git a/src/main/java/io/elastic/jdbc/actions/SelectAction.java b/src/main/java/io/elastic/jdbc/actions/SelectAction.java index d74fe9e..ca521bc 100644 --- a/src/main/java/io/elastic/jdbc/actions/SelectAction.java +++ b/src/main/java/io/elastic/jdbc/actions/SelectAction.java @@ -7,12 +7,10 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,14 +27,12 @@ public void execute(ExecutionParameters parameters) { final JsonObject body = parameters.getMessage().getBody(); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); Connection connection = Utils.getConnection(configuration); String dbEngine = configuration.getString("dbEngine"); String sqlQuery = configuration.getString("sqlQuery"); Integer skipNumber = 0; Boolean nullableResult = false; - Integer rowsCount = 0; if (Utils.getNonNullString(configuration, PROPERTY_NULLABLE_RESULT).equals("true")) { nullableResult = true; @@ -50,36 +46,35 @@ public void execute(ExecutionParameters parameters) { Utils.columnTypes = Utils.getVariableTypes(sqlQuery); LOGGER.info("Detected column types: " + Utils.columnTypes); - ResultSet rs = null; - LOGGER.info("Executing select trigger"); + LOGGER.info("Executing select action"); try { QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); sqlQuery = Query.preProcessSelect(sqlQuery); LOGGER.info("SQL Query: {}", sqlQuery); - rs = query.executeSelectQuery(connection, sqlQuery, body); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - LOGGER.info("columns count: {} from {}", rowsCount, metaData.getColumnCount()); - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - LOGGER.info("Emitting data"); - LOGGER.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + + ArrayList resultList = query.executeSelectQuery(connection, sqlQuery, body); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - if (rowsCount == 0 && nullableResult) { - row.add("empty dataset", "no data"); - LOGGER.info("Emitting data"); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - } else if (rowsCount == 0 && !nullableResult) { + if (resultList.size() == 0 && nullableResult) { + resultList.add(Json.createObjectBuilder() + .add("empty dataset", "no data") + .build()); + LOGGER.info("Emitting data {}", resultList.get(0)); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(0)).build()); + } else if (resultList.size() == 0 && !nullableResult) { LOGGER.info("Empty response. Error message will be returned"); throw new RuntimeException("Empty response"); } - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) .add(SQL_QUERY_VALUE, sqlQuery) .add(PROPERTY_NULLABLE_RESULT, nullableResult).build(); LOGGER.info("Emitting new snapshot {}", snapshot.toString()); diff --git a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java index bfa93db..7004404 100644 --- a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java +++ b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java @@ -7,15 +7,13 @@ import io.elastic.jdbc.QueryFactory; import io.elastic.jdbc.Utils; import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,31 +28,27 @@ public class GetRowsPollingTrigger implements Module { private static final String PROPERTY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.sss"; private static final String PROPERTY_SKIP_NUMBER = "skipNumber"; private static final String DATETIME_REGEX = "(\\d{4})-(\\d{2})-(\\d{2}) (\\d{2}):(\\d{2}):(\\d{2})(\\.(\\d{1,3}))?"; - private static boolean isEmpty = true; @Override public final void execute(ExecutionParameters parameters) { LOGGER.info("About to execute select trigger"); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); - Connection connection = Utils.getConnection(configuration); + Connection connection = null; Integer skipNumber = 0; - Integer rowsCount = 0; String pollingField = ""; Calendar cDate = Calendar.getInstance(); cDate.set(cDate.get(Calendar.YEAR), cDate.get(Calendar.MONTH), cDate.get(Calendar.DATE), 0, 0, 0); - String dbEngine = configuration.getString(Utils.CFG_DB_ENGINE); String tableName = configuration.getString(PROPERTY_TABLE_NAME); + if (Utils.getNonNullString(configuration, PROPERTY_POLLING_FIELD).length() != 0) { pollingField = configuration.getString(PROPERTY_POLLING_FIELD); } Timestamp pollingValue; Timestamp cts = new java.sql.Timestamp(cDate.getTimeInMillis()); - Timestamp maxPollingValue = cts; String formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT).format(cts); if (configuration.containsKey(PROPERTY_POLLING_VALUE) && Utils @@ -80,44 +74,37 @@ public final void execute(ExecutionParameters parameters) { skipNumber = 0; } - ResultSet rs = null; LOGGER.info("Executing row polling trigger"); try { + connection = Utils.getConnection(configuration); QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); query.from(tableName).skip(skipNumber).orderBy(pollingField) .rowsPolling(pollingField, pollingValue); - rs = query.executePolling(connection); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); - rowsCount++; + query.setMaxPollingValue(cts); + ArrayList resultList = query.executePolling(connection); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - if (rowsCount == 0) { - row.add("empty dataset", "no data"); + if (resultList.size() == 0) { + resultList.add(Json.createObjectBuilder() + .add("empty dataset", "no data") + .build()); LOGGER.info("Emitting empty data"); - maxPollingValue = new java.sql.Timestamp(System.currentTimeMillis()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + query.setMaxPollingValue(new java.sql.Timestamp(System.currentTimeMillis())); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(0)).build()); } - formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT).format(maxPollingValue); + formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT) + .format(query.getMaxPollingValue()); - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) .add(PROPERTY_TABLE_NAME, tableName) .add(PROPERTY_POLLING_FIELD, pollingField) .add(PROPERTY_POLLING_VALUE, formattedDate).build(); @@ -127,13 +114,6 @@ public final void execute(ExecutionParameters parameters) { LOGGER.error("Failed to make request", e.toString()); throw new RuntimeException(e); } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close result set", e.toString()); - } - } if (connection != null) { try { connection.close(); diff --git a/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java b/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java index a94ae8e..1155fb1 100644 --- a/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java +++ b/src/main/java/io/elastic/jdbc/triggers/SelectTrigger.java @@ -8,14 +8,13 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +35,9 @@ public final void execute(ExecutionParameters parameters) { LOGGER.info("About to execute select trigger"); final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); - JsonObjectBuilder row = Json.createObjectBuilder(); checkConfig(configuration); Connection connection = Utils.getConnection(configuration); Integer skipNumber = 0; - Integer rowsCount = 0; Calendar cDate = Calendar.getInstance(); cDate.set(cDate.get(Calendar.YEAR), cDate.get(Calendar.MONTH), cDate.get(Calendar.DATE), 0, 0, @@ -79,26 +76,25 @@ public final void execute(ExecutionParameters parameters) { query.selectPolling(sqlQuery, pollingValue); } LOGGER.info("SQL Query: {}", sqlQuery); - rs = query.executeSelectTrigger(connection, sqlQuery); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - LOGGER.info("Emitting data"); - LOGGER.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + ArrayList resultList = query.executeSelectTrigger(connection, sqlQuery); + for (int i = 0; i < resultList.size(); i++) { + LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Emitting data {}", resultList.get(i).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(i)).build()); } - if (rowsCount == 0) { - row.add("empty dataset", "no data"); - LOGGER.info("Emitting data"); - LOGGER.info(row.toString()); - parameters.getEventEmitter().emitData(new Message.Builder().body(row.build()).build()); + if (resultList.size() == 0) { + resultList.add(Json.createObjectBuilder() + .add("empty dataset", "no data") + .build()); + LOGGER.info("Emitting data {}", resultList.get(0).toString()); + parameters.getEventEmitter() + .emitData(new Message.Builder().body(resultList.get(0)).build()); } - snapshot = Json.createObjectBuilder().add(PROPERTY_SKIP_NUMBER, skipNumber + rowsCount) + snapshot = Json.createObjectBuilder() + .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) .add(LAST_POLL_PLACEHOLDER, pollingValue.toString()) .add(SQL_QUERY_VALUE, sqlQuery).build(); LOGGER.info("Emitting new snapshot {}", snapshot.toString()); @@ -106,21 +102,6 @@ public final void execute(ExecutionParameters parameters) { } catch (SQLException e) { LOGGER.error("Failed to make request", e.toString()); throw new RuntimeException(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close result set", e.toString()); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close connection", e.toString()); - } - } } } diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy new file mode 100644 index 0000000..16c48c6 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMSSQLSpec.groovy @@ -0,0 +1,146 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +//@Ignore +class DeleteActionMSSQLSpec extends Specification { + + @Shared + def user = "john"//System.getenv("CONN_USER_MSSQL") + @Shared + def password = "elastic123"//System.getenv("CONN_PASSWORD_MSSQL") + @Shared + def databaseName = "Test2"//System.getenv("CONN_DBNAME_MSSQL") + @Shared + def host = "eio-mssql-fra.c79g081qpeyv.eu-central-1.rds.amazonaws.com" +//System.getenv("CONN_HOST_MSSQL") + @Shared + def port = "1433"//System.getenv("CONN_PORT_MSSQL") + @Shared + def connectionString = "jdbc:sqlserver://" + host + ":" + port + ";database=" + databaseName + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "mssql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + connection.createStatement().execute("IF OBJECT_ID('stars', 'U') IS NOT NULL\n" + + " DROP TABLE stars;"); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 0, '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 0, '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "IF OBJECT_ID('persons', 'U') IS NOT NULL\n" + + " DROP TABLE persons;" + + connection.createStatement().execute(sql) + sql = "IF OBJECT_ID('stars', 'U') IS NOT NULL\n" + + " DROP TABLE stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } + + +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy new file mode 100644 index 0000000..dee6798 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionMySQLSpec.groovy @@ -0,0 +1,144 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +//@Ignore +class DeleteActionMySQLSpec extends Specification { + + @Shared + def user = "elasticio"//System.getenv("CONN_USER_MSSQL") + @Shared + def password = "lo4hDacS5L"//System.getenv("CONN_PASSWORD_MSSQL") + @Shared + def databaseName = "elasticio_testdb"//System.getenv("CONN_DBNAME_MSSQL") + @Shared + def host = "ec2-18-194-228-22.eu-central-1.compute.amazonaws.com" +//System.getenv("CONN_HOST_MSSQL") + @Shared + def port = "3306"//System.getenv("CONN_PORT_MSSQL") + @Shared + def dbEngine = "mysql"//System.getenv("CONN_DBENGINE_MSSQL") + @Shared + def connectionString = "jdbc:" + dbEngine + "://" + host + ":" + port + "/" + databaseName + "?useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "mysql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int PRIMARY KEY, name varchar(255) NOT NULL, " + + "date datetime, radius int, destination int, visible bit, visibledate date)"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 0, '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 0, '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy new file mode 100644 index 0000000..ff34392 --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionOracleSpec.groovy @@ -0,0 +1,154 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +//@Ignore +class DeleteActionOracleSpec extends Specification { + + @Shared + def user = "elasticio"//System.getenv("CONN_USER_ORACLE") + @Shared + def password = "PeU13cbKtH"//System.getenv("CONN_PASSWORD_ORACLE") + @Shared + def databaseName = "elasticio_testdb"//System.getenv("CONN_DBNAME_ORACLE") + @Shared + def host = "ec2-18-194-228-22.eu-central-1.compute.amazonaws.com" +//System.getenv("CONN_HOST_ORACLE") + @Shared + def port = "1521"//System.getenv("CONN_PORT_ORACLE") + @Shared + def dbEngine = "oracle"//System.getenv("CONN_DBENGINE_ORACLE") + @Shared + def connectionString = "jdbc:oracle:thin:@//" + host + ":" + port + "/XE" + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "STARS") + .add("user", user) + .add("password", password) + .add("dbEngine", "oracle") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id number, name varchar(255) NOT NULL, " + + "radius number, destination float,visible number(1), " + + "CONSTRAINT pk_stars PRIMARY KEY (id))"); + connection.createStatement().execute("INSERT INTO stars (ID,NAME,RADIUS,DESTINATION, VISIBLE) VALUES (1,'Taurus',321,44.4,1)") + connection.createStatement().execute("INSERT INTO stars (ID,NAME,RADIUS,DESTINATION, VISIBLE) VALUES (2,'Boston',581,94.4,0)") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "BEGIN" + + " EXECUTE IMMEDIATE 'DROP TABLE stars';" + + "EXCEPTION" + + " WHEN OTHERS THEN" + + " IF SQLCODE != -942 THEN" + + " RAISE;" + + " END IF;" + + "END;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("ID", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("ID", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy new file mode 100644 index 0000000..b9c2c3f --- /dev/null +++ b/src/test/groovy/io/elastic/jdbc/actions/delete/DeleteActionPostrgeSpec.groovy @@ -0,0 +1,144 @@ +package io.elastic.jdbc.actions.delete + +import io.elastic.api.EventEmitter +import io.elastic.api.ExecutionParameters +import io.elastic.api.Message +import io.elastic.jdbc.actions.DeleteRowByPrimaryKey +import spock.lang.Shared +import spock.lang.Specification + +import javax.json.Json +import javax.json.JsonObject +import java.sql.Connection +import java.sql.DriverManager +import java.sql.ResultSet + +//@Ignore +class DeleteActionPostrgeSpec extends Specification { + + @Shared + def user = "elasticio"//System.getenv("CONN_USER_ORACLE") + @Shared + def password = "2uDyG4hHxR"//System.getenv("CONN_PASSWORD_ORACLE") + @Shared + def databaseName = "elasticio_testdb"//System.getenv("CONN_DBNAME_ORACLE") + @Shared + def host = "ec2-18-194-228-22.eu-central-1.compute.amazonaws.com" +//System.getenv("CONN_HOST_ORACLE") + @Shared + def port = "5432"//System.getenv("CONN_PORT_ORACLE") + @Shared + def dbEngine = "postgresql" + @Shared + def connectionString = "jdbc:postgresql://" + host + ":" + port + "/" + databaseName + @Shared + Connection connection + + @Shared + EventEmitter.Callback errorCallback + @Shared + EventEmitter.Callback snapshotCallback + @Shared + EventEmitter.Callback dataCallback + @Shared + EventEmitter.Callback reboundCallback + @Shared + EventEmitter.Callback httpReplyCallback + @Shared + EventEmitter emitter + @Shared + DeleteRowByPrimaryKey action + + def setupSpec() { + connection = DriverManager.getConnection(connectionString, user, password) + } + + def setup() { + createAction() + } + + def createAction() { + errorCallback = Mock(EventEmitter.Callback) + snapshotCallback = Mock(EventEmitter.Callback) + dataCallback = Mock(EventEmitter.Callback) + reboundCallback = Mock(EventEmitter.Callback) + httpReplyCallback = Mock(EventEmitter.Callback) + emitter = new EventEmitter.Builder().onData(dataCallback).onSnapshot(snapshotCallback).onError(errorCallback) + .onRebound(reboundCallback).onHttpReplyCallback(httpReplyCallback).build() + action = new DeleteRowByPrimaryKey() + } + + def runAction(JsonObject config, JsonObject body, JsonObject snapshot) { + Message msg = new Message.Builder().body(body).build() + ExecutionParameters params = new ExecutionParameters(msg, emitter, config, snapshot) + action.execute(params); + } + + def getStarsConfig() { + JsonObject config = Json.createObjectBuilder() + .add("tableName", "stars") + .add("user", user) + .add("password", password) + .add("dbEngine", "postgresql") + .add("host", host) + .add("port", port) + .add("databaseName", databaseName) + .add("nullableResult", "true") + .build(); + return config; + } + + def prepareStarsTable() { + String sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql); + connection.createStatement().execute("CREATE TABLE stars (id int, name varchar(255) NOT NULL, " + + "date timestamp, radius int, destination int, visible boolean, visibledate date, PRIMARY KEY(id))"); + connection.createStatement().execute("INSERT INTO stars values (1,'Taurus', '2015-02-19 10:10:10.0'," + + " 123, 5, 'true', '2015-02-19')") + connection.createStatement().execute("INSERT INTO stars values (2,'Eridanus', '2017-02-19 10:10:10.0'," + + " 852, 5, 'false', '2015-07-19')") + } + + def getRecords(tableName) { + ArrayList records = new ArrayList(); + String sql = "SELECT * FROM " + tableName; + ResultSet rs = connection.createStatement().executeQuery(sql); + while (rs.next()) { + records.add(rs.toRowResult().toString()); + } + rs.close(); + return records; + } + + def cleanupSpec() { + String sql = "DROP TABLE IF EXISTS persons;" + + connection.createStatement().execute(sql) + sql = "DROP TABLE IF EXISTS stars;" + connection.createStatement().execute(sql) + connection.close() + } + + def "one delete"() { + + prepareStarsTable(); + + JsonObject snapshot = Json.createObjectBuilder().build() + + JsonObject body = Json.createObjectBuilder() + .add("id", 1) + .build(); + + runAction(getStarsConfig(), body, snapshot) + int first = getRecords("stars").size() + JsonObject body2 = Json.createObjectBuilder() + .add("id", 2) + .build() + runAction(getStarsConfig(), body2, snapshot) + int second = getRecords("stars").size() + + expect: + first == 1 + second == 0 + } +} diff --git a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy similarity index 94% rename from src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy rename to src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy index bdc1ad0..6967de5 100644 --- a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMSSQLSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMSSQLSpec.groovy @@ -1,8 +1,9 @@ -package io.elastic.jdbc.actions +package io.elastic.jdbc.actions.upsert import io.elastic.api.EventEmitter import io.elastic.api.ExecutionParameters import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey import spock.lang.* import javax.json.Json @@ -11,19 +12,19 @@ import java.sql.Connection import java.sql.DriverManager import java.sql.ResultSet -@Ignore +//@Ignore class UpsertRowByPrimaryKeyMSSQLSpec extends Specification { @Shared - def user = System.getenv("CONN_USER_MSSQL") + def user = "john"//System.getenv("CONN_USER_MSSQL") @Shared - def password = System.getenv("CONN_PASSWORD_MSSQL") + def password = "elastic123"//System.getenv("CONN_PASSWORD_MSSQL") @Shared - def databaseName = System.getenv("CONN_DBNAME_MSSQL") + def databaseName = "Test2"//System.getenv("CONN_DBNAME_MSSQL") @Shared - def host = System.getenv("CONN_HOST_MSSQL") + def host = "eio-mssql-fra.c79g081qpeyv.eu-central-1.rds.amazonaws.com"//System.getenv("CONN_HOST_MSSQL") @Shared - def port = System.getenv("CONN_PORT_MSSQL") + def port = "1433"//System.getenv("CONN_PORT_MSSQL") @Shared def connectionString ="jdbc:sqlserver://" + host + ":" + port + ";database=" + databaseName @Shared diff --git a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMySQLSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy similarity index 98% rename from src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMySQLSpec.groovy rename to src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy index 9c570b5..115b3f0 100644 --- a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyMySQLSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyMySQLSpec.groovy @@ -1,8 +1,9 @@ -package io.elastic.jdbc.actions +package io.elastic.jdbc.actions.upsert import io.elastic.api.EventEmitter import io.elastic.api.ExecutionParameters import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey import spock.lang.Ignore import spock.lang.Shared import spock.lang.Specification diff --git a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyOracleSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy similarity index 98% rename from src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyOracleSpec.groovy rename to src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy index 19b0156..182113a 100644 --- a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyOracleSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyOracleSpec.groovy @@ -1,8 +1,9 @@ -package io.elastic.jdbc.actions +package io.elastic.jdbc.actions.upsert import io.elastic.api.EventEmitter import io.elastic.api.ExecutionParameters import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey import spock.lang.Ignore import spock.lang.Shared import spock.lang.Specification diff --git a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyPostgreSpec.groovy b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy similarity index 98% rename from src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyPostgreSpec.groovy rename to src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy index 486fa04..829be62 100644 --- a/src/test/groovy/io/elastic/jdbc/actions/UpsertRowByPrimaryKeyPostgreSpec.groovy +++ b/src/test/groovy/io/elastic/jdbc/actions/upsert/UpsertRowByPrimaryKeyPostgreSpec.groovy @@ -1,8 +1,9 @@ -package io.elastic.jdbc.actions +package io.elastic.jdbc.actions.upsert import io.elastic.api.EventEmitter import io.elastic.api.ExecutionParameters import io.elastic.api.Message +import io.elastic.jdbc.actions.UpsertRowByPrimaryKey import spock.lang.Ignore import spock.lang.Shared import spock.lang.Specification