diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java index f849824..55e1e7c 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MSSQL.java @@ -3,14 +3,10 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Map; -import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class MSSQL extends Query { @@ -28,36 +24,8 @@ public ArrayList executePolling(Connection connection) throws SQLException { " )" + " SELECT *" + " FROM Results_CTE" + - " WHERE RowNum > ?" + - " AND RowNum < ?"; - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - try (ResultSet rs = stmt.executeQuery()) { - ArrayList listResult = new ArrayList(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - listResult.add(row.build()); - } - return listResult; - } - } + " WHERE RowNum <= ?"; + return getRowsExecutePolling(connection, sql); } public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { @@ -74,7 +42,7 @@ public JsonObject executeLookup(Connection connection, JsonObject body) throws S " FROM Results_CTE" + " WHERE RowNum > ?" + " AND RowNum < ?"; - return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); + return getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -88,14 +56,6 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep } } - public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { - validateQuery(); - String sql = "SELECT COUNT(*)" + - " FROM " + tableName + - " WHERE " + lookupField + " = ?"; - return Utils.isRecordExists(connection, body, sql, lookupField); - } - public void executeInsert(Connection connection, String tableName, JsonObject body) throws SQLException { validateQuery(); diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java index c8e4441..8a6fa6e 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/MySQL.java @@ -3,14 +3,10 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Map; -import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class MySQL extends Query { @@ -23,38 +19,11 @@ public ArrayList executePolling(Connection connection) throws SQLException { sql.append(pollingField); sql.append(" > ?"); if (orderField != null) { - sql.append(" ORDER BY ").append(orderField); + sql.append(" ORDER BY ").append(orderField).append(" ASC"); } - sql.append(" ASC LIMIT ? OFFSET ?"); + sql.append(" LIMIT ?"); - try (PreparedStatement stmt = connection.prepareStatement(sql.toString())) { - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, countNumber); - stmt.setInt(3, skipNumber); - try (ResultSet rs = stmt.executeQuery()) { - ArrayList listResult = new ArrayList(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - listResult.add(row.build()); - } - return listResult; - } - } + return getRowsExecutePolling(connection, sql.toString()); } public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { @@ -66,7 +35,7 @@ public JsonObject executeLookup(Connection connection, JsonObject body) throws S sql.append(" = ?"); sql.append(" ORDER BY ").append(lookupField); sql.append(" ASC LIMIT ? OFFSET ?"); - return Utils.getLookupRow(connection, body, sql.toString(), countNumber, skipNumber); + return getLookupRow(connection, body, sql.toString(), countNumber, skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -79,14 +48,6 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep } } - public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { - validateQuery(); - String sql = "SELECT COUNT(*)" + - " FROM " + tableName + - " WHERE " + lookupField + " = ?"; - return Utils.isRecordExists(connection, body, sql, lookupField); - } - public void executeInsert(Connection connection, String tableName, JsonObject body) throws SQLException { validateQuery(); diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java index 0027561..36fd751 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Oracle.java @@ -3,54 +3,24 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Map; -import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class Oracle extends Query { public ArrayList executePolling(Connection connection) throws SQLException { validateQuery(); - String sql = "SELECT * FROM " + - " (SELECT b.*, rank() over (order by " + pollingField + ") as rnk FROM " + - tableName + " b) WHERE " + pollingField + " > ?" + - " AND rnk BETWEEN ? AND ?" + - " ORDER BY " + pollingField; - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - /* data types mapping https://docs.oracle.com/cd/B19306_01/java.102/b14188/datamap.htm */ - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber); - try (ResultSet rs = stmt.executeQuery()) { - ArrayList listResult = new ArrayList(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - listResult.add(row.build()); - } - return listResult; - } - } + String sql = String.format("SELECT * FROM (" + + "SELECT ROW_NUMBER() OVER( ORDER BY %s) as rn, o.* from %s o WHERE %s > ?) " + + "WHERE rn<=? ORDER BY %s", + pollingField, + tableName, + pollingField, + pollingField); + return getRowsExecutePolling(connection, sql); } public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { @@ -60,7 +30,7 @@ public JsonObject executeLookup(Connection connection, JsonObject body) throws S tableName + " b) WHERE " + lookupField + " = ? " + "AND rnk BETWEEN ? AND ? " + "ORDER BY " + lookupField; - return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber); + return getLookupRow(connection, body, sql, skipNumber, countNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -73,14 +43,6 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep } } - public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { - validateQuery(); - String sql = "SELECT COUNT(*)" + - " FROM " + tableName + - " WHERE " + lookupField + " = ?"; - return Utils.isRecordExists(connection, body, sql, lookupField); - } - public void executeInsert(Connection connection, String tableName, JsonObject body) throws SQLException { validateQuery(); diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java index c666d95..393b0e9 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/PostgreSQL.java @@ -3,14 +3,10 @@ import io.elastic.jdbc.Utils; import java.sql.Connection; import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Map; -import javax.json.Json; import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.json.JsonValue; public class PostgreSQL extends Query { @@ -27,36 +23,8 @@ public ArrayList executePolling(Connection connection) throws SQLException { " )" + " SELECT *" + " FROM results_cte" + - " WHERE rownum > ?" + - " AND rownum < ?"; - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setTimestamp(1, pollingValue); - stmt.setInt(2, skipNumber); - stmt.setInt(3, countNumber + skipNumber); - try (ResultSet rs = stmt.executeQuery()) { - ArrayList listResult = new ArrayList(); - JsonObjectBuilder row = Json.createObjectBuilder(); - ResultSetMetaData metaData = rs.getMetaData(); - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { - if (maxPollingValue.before(rs.getTimestamp(i))) { - if (rs.getString(metaData.getColumnName(i)).length() > 10) { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i))); - } else { - maxPollingValue = java.sql.Timestamp - .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); - } - } - } - } - listResult.add(row.build()); - } - return listResult; - } - } + " WHERE rownum <= ?"; + return getRowsExecutePolling(connection, sql); } public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException { @@ -73,7 +41,7 @@ public JsonObject executeLookup(Connection connection, JsonObject body) throws S " FROM results_cte" + " WHERE rownum > ?" + " AND rownum < ?"; - return Utils.getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); + return getLookupRow(connection, body, sql, skipNumber, countNumber + skipNumber); } public int executeDelete(Connection connection, JsonObject body) throws SQLException { @@ -89,14 +57,6 @@ public int executeDelete(Connection connection, JsonObject body) throws SQLExcep } } - public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { - validateQuery(); - String sql = "SELECT COUNT(*)" + - " FROM " + tableName + - " WHERE " + lookupField + " = ?"; - return Utils.isRecordExists(connection, body, sql, lookupField); - } - public void executeInsert(Connection connection, String tableName, JsonObject body) throws SQLException { validateQuery(); diff --git a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java index b634631..88513d9 100644 --- a/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java +++ b/src/main/java/io/elastic/jdbc/QueryBuilders/Query.java @@ -85,9 +85,6 @@ public void setMaxPollingValue(Timestamp maxPollingValue) { abstract public JsonObject executeLookup(Connection connection, JsonObject body) throws SQLException; - abstract public boolean executeRecordExists(Connection connection, JsonObject body) - throws SQLException; - abstract public int executeDelete(Connection connection, JsonObject body) throws SQLException; abstract public void executeInsert(Connection connection, String tableName, JsonObject body) @@ -96,6 +93,20 @@ abstract public void executeInsert(Connection connection, String tableName, Json abstract public void executeUpdate(Connection connection, String tableName, String idColumn, String idValue, JsonObject body) throws SQLException; + public boolean executeRecordExists(Connection connection, JsonObject body) throws SQLException { + validateQuery(); + String sql = "SELECT COUNT(*)" + + " FROM " + tableName + + " WHERE " + lookupField + " = ?"; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + Utils.setStatementParam(stmt, 1, lookupField, body); + try (ResultSet rs = stmt.executeQuery()) { + rs.next(); + return rs.getInt(1) > 0; + } + } + } + public ArrayList executeSelectTrigger(Connection connection, String sqlQuery) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement(sqlQuery)) { @@ -259,4 +270,61 @@ public void validateQuery() { } } + public ArrayList getRowsExecutePolling(Connection connection, String sql) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setTimestamp(1, pollingValue); + stmt.setInt(2, countNumber); + try (ResultSet rs = stmt.executeQuery()) { + ArrayList listResult = new ArrayList(); + JsonObjectBuilder row = Json.createObjectBuilder(); + ResultSetMetaData metaData = rs.getMetaData(); + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + if (metaData.getColumnName(i).toUpperCase().equals(pollingField.toUpperCase())) { + if (maxPollingValue.before(rs.getTimestamp(i))) { + if (rs.getString(metaData.getColumnName(i)).length() > 10) { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i))); + } else { + maxPollingValue = java.sql.Timestamp + .valueOf(rs.getString(metaData.getColumnName(i)) + " 00:00:00"); + } + } + } + } + listResult.add(row.build()); + } + return listResult; + } + } + } + + public static JsonObject getLookupRow(Connection connection, JsonObject body, String sql, + Integer secondParameter, Integer thirdParameter) + throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + JsonObjectBuilder row = Json.createObjectBuilder(); + for (Map.Entry entry : body.entrySet()) { + Utils.setStatementParam(stmt, 1, entry.getKey(), body); + } + stmt.setInt(2, secondParameter); + stmt.setInt(3, thirdParameter); + try (ResultSet rs = stmt.executeQuery()) { + ResultSetMetaData metaData = rs.getMetaData(); + int rowsCount = 0; + while (rs.next()) { + for (int i = 1; i <= metaData.getColumnCount(); i++) { + row = Utils.getColumnDataByType(rs, metaData, i, row); + } + rowsCount++; + if (rowsCount > 1) { + throw new RuntimeException("Error: the number of matching rows is not exactly one"); + } + } + return row.build(); + } + } + } + } diff --git a/src/main/java/io/elastic/jdbc/Utils.java b/src/main/java/io/elastic/jdbc/Utils.java index 4f26689..21b61f6 100644 --- a/src/main/java/io/elastic/jdbc/Utils.java +++ b/src/main/java/io/elastic/jdbc/Utils.java @@ -17,7 +17,6 @@ import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; import javax.json.JsonValue; @@ -300,43 +299,4 @@ public static JsonObjectBuilder getColumnDataByType(ResultSet rs, ResultSetMetaD return row; } - public static JsonObject getLookupRow(Connection connection, JsonObject body, String sql, - Integer secondParameter, Integer thirdParameter) - throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - JsonObjectBuilder row = Json.createObjectBuilder(); - for (Map.Entry entry : body.entrySet()) { - Utils.setStatementParam(stmt, 1, entry.getKey(), body); - } - stmt.setInt(2, secondParameter); - stmt.setInt(3, thirdParameter); - try (ResultSet rs = stmt.executeQuery()) { - ResultSetMetaData metaData = rs.getMetaData(); - int rowsCount = 0; - while (rs.next()) { - for (int i = 1; i <= metaData.getColumnCount(); i++) { - row = Utils.getColumnDataByType(rs, metaData, i, row); - } - rowsCount++; - if (rowsCount > 1) { - LOGGER.error("Error: the number of matching rows is not exactly one"); - throw new RuntimeException("Error: the number of matching rows is not exactly one"); - } - } - return row.build(); - } - } - } - - public static boolean isRecordExists(Connection connection, JsonObject body, String sql, - String lookupField) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - Utils.setStatementParam(stmt, 1, lookupField, body); - try (ResultSet rs = stmt.executeQuery()) { - rs.next(); - return rs.getInt(1) > 0; - } - } - } - } diff --git a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java index 63f02a7..b9ce253 100644 --- a/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java +++ b/src/main/java/io/elastic/jdbc/triggers/GetRowsPollingTrigger.java @@ -25,8 +25,7 @@ public class GetRowsPollingTrigger implements Module { private static final String PROPERTY_TABLE_NAME = "tableName"; private static final String PROPERTY_POLLING_FIELD = "pollingField"; private static final String PROPERTY_POLLING_VALUE = "pollingValue"; - private static final String PROPERTY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.sss"; - private static final String PROPERTY_SKIP_NUMBER = "skipNumber"; + private static final String PROPERTY_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; private static final String DATETIME_REGEX = "(\\d{4})-(\\d{2})-(\\d{2}) (\\d{2}):(\\d{2}):(\\d{2})(\\.(\\d{1,3}))?"; @Override @@ -35,8 +34,6 @@ public final void execute(ExecutionParameters parameters) { final JsonObject configuration = parameters.getConfiguration(); JsonObject snapshot = parameters.getSnapshot(); checkConfig(configuration); - Connection connection = null; - Integer skipNumber = 0; String pollingField = ""; Calendar cDate = Calendar.getInstance(); cDate.set(cDate.get(Calendar.YEAR), cDate.get(Calendar.MONTH), cDate.get(Calendar.DATE), 0, 0, @@ -64,54 +61,33 @@ public final void execute(ExecutionParameters parameters) { pollingValue = cts; } - if (snapshot.containsKey(PROPERTY_SKIP_NUMBER)) { - skipNumber = snapshot.getInt(PROPERTY_SKIP_NUMBER); - } - - if (snapshot.containsKey(PROPERTY_TABLE_NAME) && snapshot.get(PROPERTY_TABLE_NAME) != null - && !snapshot.getString(PROPERTY_TABLE_NAME) - .equals(tableName)) { - skipNumber = 0; - } - LOGGER.info("Executing row polling trigger"); - try { - connection = Utils.getConnection(configuration); + try (Connection connection = Utils.getConnection(configuration)) { QueryFactory queryFactory = new QueryFactory(); Query query = queryFactory.getQuery(dbEngine); - query.from(tableName).skip(skipNumber).orderBy(pollingField) + query.from(tableName).orderBy(pollingField) .rowsPolling(pollingField, pollingValue); query.setMaxPollingValue(cts); ArrayList resultList = query.executePolling(connection); for (int i = 0; i < resultList.size(); i++) { - LOGGER.info("Columns count: {} from {}", i + 1, resultList.size()); + LOGGER.info("Row number: {} from {}", i + 1, resultList.size()); LOGGER.info("Emitting data {}", resultList.get(i).toString()); parameters.getEventEmitter() .emitData(new Message.Builder().body(resultList.get(i)).build()); } - - formattedDate = new SimpleDateFormat(PROPERTY_DATETIME_FORMAT) - .format(query.getMaxPollingValue()); - - snapshot = Json.createObjectBuilder() - .add(PROPERTY_SKIP_NUMBER, skipNumber + resultList.size()) - .add(PROPERTY_TABLE_NAME, tableName) - .add(PROPERTY_POLLING_FIELD, pollingField) - .add(PROPERTY_POLLING_VALUE, formattedDate).build(); - LOGGER.info("Emitting new snapshot {}", snapshot.toString()); - parameters.getEventEmitter().emitSnapshot(snapshot); + if (resultList.size() > 0) { + formattedDate = query.getMaxPollingValue().toString(); + snapshot = Json.createObjectBuilder() + .add(PROPERTY_TABLE_NAME, tableName) + .add(PROPERTY_POLLING_FIELD, pollingField) + .add(PROPERTY_POLLING_VALUE, formattedDate).build(); + LOGGER.info("Emitting new snapshot {}", snapshot.toString()); + parameters.getEventEmitter().emitSnapshot(snapshot); + } } catch (SQLException e) { LOGGER.error("Failed to make request", e.toString()); throw new RuntimeException(e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - LOGGER.error("Failed to close connection", e.toString()); - } - } } }