From de46f639bf358535c7b8e5379e88722d824d395e Mon Sep 17 00:00:00 2001 From: mmichalek Date: Fri, 3 Jun 2016 10:50:51 -0400 Subject: [PATCH] 0002565: Add VoltDB Dialect - fix purge. --- .../db/voltdb/VoltDbSymmetricDialect.java | 2 + .../db/AbstractSymmetricDialect.java | 18 +++++ .../symmetric/db/ISymmetricDialect.java | 10 +++ .../symmetric/service/impl/PurgeService.java | 69 +++++++++++++++++-- .../service/impl/PurgeServiceSqlMap.java | 10 ++- .../db/platform/voltdb/VoltDbDdlBuilder.java | 4 +- .../voltdb/VoltDbDatabasePlatform.java | 4 +- 7 files changed, 107 insertions(+), 10 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/voltdb/VoltDbSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/voltdb/VoltDbSymmetricDialect.java index 309da0a428..edd56a67d0 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/voltdb/VoltDbSymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/voltdb/VoltDbSymmetricDialect.java @@ -34,6 +34,8 @@ public class VoltDbSymmetricDialect extends AbstractSymmetricDialect { public VoltDbSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) { super(parameterService, platform); this.triggerTemplate = new VoltDbTriggerTemplate(this); + this.supportsSubselectsInDelete = false; + this.supportsSubselectsInUpdate = false; } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java index 0c1f0e135e..bd650ef62d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java @@ -99,6 +99,10 @@ abstract public class AbstractSymmetricDialect implements ISymmetricDialect { protected boolean supportsTransactionViews = false; + protected boolean supportsSubselectsInDelete = true; + + protected boolean supportsSubselectsInUpdate = true; + protected Map sqlReplacementTokens = new HashMap(); public AbstractSymmetricDialect() { @@ -555,6 +559,20 @@ public String getProductVersion() { public boolean supportsTransactionViews() { return supportsTransactionViews; } + + /* + * Indicates if this dialect supports subselects in delete statements. + */ + public boolean supportsSubselectsInDelete() { + return supportsSubselectsInDelete; + } + + /* + * Indicates if this dialect supports subselects in update statements. + */ + public boolean supportsSubselectsInUpdate() { + return supportsSubselectsInUpdate; + } public long insertWithGeneratedKey(String sql, SequenceIdentifier sequenceId) { return insertWithGeneratedKey(sql, sequenceId, null, null); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java index ceb973938c..1b0692bb77 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java @@ -114,6 +114,16 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc * a way to check on pending database transactions. */ public boolean supportsTransactionViews(); + + /* + * Indicates if this dialect supports subselects in delete statements. + */ + public boolean supportsSubselectsInDelete(); + + /* + * Indicates if this dialect supports subselects in update statements. + */ + public boolean supportsSubselectsInUpdate(); /* * Implement this if the database has some type of cleanup functionality diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 39ce047172..7d1284d5a3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -23,12 +23,14 @@ import java.sql.Timestamp; import java.sql.Types; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.List; import org.apache.commons.lang.time.DateUtils; +import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; import org.jumpmind.symmetric.common.ParameterConstants; @@ -91,10 +93,16 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) { log.info("The outgoing purge process is about to run for data older than {}", SimpleDateFormat.getDateTimeInstance() .format(retentionCutoff.getTime())); - rowsPurged += purgeStrandedBatches(); - rowsPurged += purgeDataRows(retentionCutoff); - rowsPurged += purgeOutgoingBatch(retentionCutoff); - rowsPurged += purgeExtractRequests(); + // VoltDB doesn't support capture, or subselects. So we'll just be purging heartbeats + // by date here. + if (getSymmetricDialect().getName().equalsIgnoreCase(DatabaseNamesConstants.VOLTDB)) { + rowsPurged += purgeOutgoingByRetentionCutoff(retentionCutoff); + } else { + rowsPurged += purgeStrandedBatches(); + rowsPurged += purgeDataRows(retentionCutoff); + rowsPurged += purgeOutgoingBatch(retentionCutoff); + rowsPurged += purgeExtractRequests(); + } } finally { if (!force) { clusterService.unlock(ClusterConstants.PURGE_OUTGOING); @@ -110,6 +118,24 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) { return rowsPurged; } + protected long purgeOutgoingByRetentionCutoff(Calendar retentionCutoff) { + int totalCount = 0; + totalCount += executePurgeDelete(getSql("deleteOutgoingBatchByCreateTimeSql"), retentionCutoff.getTime()); + totalCount += executePurgeDelete(getSql("deleteDataEventByCreateTimeSql"), retentionCutoff.getTime()); + totalCount += executePurgeDelete(getSql("deleteDataByCreateTimeSql"), retentionCutoff.getTime()); + totalCount += executePurgeDelete(getSql("deleteExtractRequestByCreateTimeSql"), retentionCutoff.getTime()); + + log.info("Done purging {} rows", totalCount); + return totalCount; + } + + protected int executePurgeDelete(String deleteSql, Object argument) { + log.debug("Running the following statement: {} with the following arguments: {}", deleteSql, argument); + int count = sqlTemplate.update(deleteSql, argument); + log.debug("Deleted {} rows", count); + return count; + } + private long purgeOutgoingBatch(final Calendar time) { log.info("Getting range for outgoing batch"); long[] minMax = queryForMinMax(getSql("selectOutgoingBatchRangeSql"), @@ -287,10 +313,17 @@ public long purgeIncoming(Calendar retentionCutoff, boolean force) { private long purgeIncomingError() { log.info("Purging incoming error rows"); - long rowCount = sqlTemplate.update(getSql("deleteIncomingErrorsSql")); + long rowCount = 0; + + if (getSymmetricDialect().supportsSubselectsInDelete()) { + rowCount = sqlTemplate.update(getSql("deleteIncomingErrorsSql")); + } else { + rowCount = selectIdsAndDelete(getSql("selectIncomingErrorsBatchIdsSql"), + "batch_id", getSql("deleteIncomingErrorsBatchIdsSql")); + } + log.info("Purged {} incoming error rows", rowCount); return rowCount; - } private long purgeIncomingBatch(final Calendar time) { @@ -397,6 +430,30 @@ public void purgeAllIncomingEventsForNode(String nodeId) { log.info("Purged all {} incoming batch for node {}", count, nodeId); } + protected int selectIdsAndDelete(String selectSql, String fieldName, String deleteSql) { + List results = sqlTemplate.query(selectSql); + int rowCount = 0; + if (! results.isEmpty()) { + List ids = new ArrayList(results.size()); + for (Row row : results) { + ids.add(row.getInt(fieldName)); + } + + results = null; + + StringBuilder placeHolders = new StringBuilder(ids.size()*2); + for (int i = 0; i < ids.size(); i++) { + placeHolders.append("?,"); + } + placeHolders.setLength(placeHolders.length()-1); + + String deleteStatement = deleteSql.replace("?", placeHolders); + + rowCount = sqlTemplate.update(deleteStatement, ids.toArray()); + } + return rowCount; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index fc6e6bc909..f436f51138 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -100,7 +100,15 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac putSql("purgeNodeHostStatsSql", "delete from $(node_host_stats) where start_time < ?"); putSql("purgeNodeHostJobStatsSql", "delete from $(node_host_job_stats) where start_time < ?"); - + + putSql("selectIncomingErrorsBatchIdsSql", "select distinct e.batch_id as batch_id from sym_incoming_error e LEFT OUTER JOIN sym_incoming_batch i ON e.batch_id = i.batch_id where i.batch_id IS NULL"); + + putSql("deleteIncomingErrorsBatchIdsSql", "delete from sym_incoming_error where batch_id IN (?)"); + + putSql("deleteOutgoingBatchByCreateTimeSql", "delete from sym_outgoing_batch where create_time < ?"); + putSql("deleteDataEventByCreateTimeSql", "delete from sym_data_event where create_time < ?"); + putSql("deleteDataByCreateTimeSql", "delete from sym_data where create_time < ?"); + putSql("deleteExtractRequestByCreateTimeSql", "delete from sym_extract_request where create_time < ?"); } } \ No newline at end of file diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDdlBuilder.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDdlBuilder.java index 9ade43efe0..19d7fc98a6 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDdlBuilder.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDdlBuilder.java @@ -47,8 +47,8 @@ public VoltDbDdlBuilder() { databaseInfo.addNativeTypeMapping(Types.BIT, "TINYINT", Types.TINYINT); databaseInfo.addNativeTypeMapping(Types.DOUBLE, "DECIMAL", Types.DECIMAL); - databaseInfo.addNativeTypeMapping(Types.CLOB, "VARCHAR", Types.VARCHAR); - databaseInfo.addNativeTypeMapping(Types.LONGVARCHAR, "VARCHAR", Types.VARCHAR); + databaseInfo.addNativeTypeMapping(Types.CLOB, "VARCHAR(100000)", Types.VARCHAR); + databaseInfo.addNativeTypeMapping(Types.LONGVARCHAR, "VARCHAR(100000)", Types.VARCHAR); databaseInfo.addNativeTypeMapping(Types.CHAR, "VARCHAR", Types.VARCHAR); databaseInfo.addNativeTypeMapping(Types.BINARY, "VARCHAR", Types.VARCHAR); diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDatabasePlatform.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDatabasePlatform.java index 24de1e0136..babe87cd23 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDatabasePlatform.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/voltdb/VoltDbDatabasePlatform.java @@ -46,6 +46,8 @@ public VoltDbDatabasePlatform(DataSource dataSource, SqlTemplateSettings setting getDatabaseInfo().setDelimitedIdentifiersSupported(false); getDatabaseInfo().setTriggersSupported(false); getDatabaseInfo().setForeignKeysSupported(false); + getDatabaseInfo().setHasPrecisionAndScale(Types.DECIMAL, false); + getDatabaseInfo().setHasPrecisionAndScale(Types.FLOAT, false); } public static final String JDBC_DRIVER = "org.voltdb.jdbc.Driver"; @@ -111,7 +113,7 @@ protected VoltDbDdlReader createDdlReader() { return new VoltDbDdlReader(this); } - @Override + @Override protected VoltDbJdbcSqlTemplate createSqlTemplate() { // TODO return new VoltDbJdbcSqlTemplate(dataSource, settings, new SymmetricLobHandler(), getDatabaseInfo());