diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricLauncher.java b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricLauncher.java index cff4368e46..6101f01467 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricLauncher.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/SymmetricLauncher.java @@ -48,6 +48,7 @@ import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataLoaderService; import org.jumpmind.symmetric.service.IDataService; +import org.jumpmind.symmetric.service.IPurgeService; import org.jumpmind.symmetric.service.IRegistrationService; import org.jumpmind.symmetric.transport.IOutgoingTransport; import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport; @@ -63,7 +64,7 @@ public class SymmetricLauncher { private static final String OPTION_DUMP_BATCH = "dump-batch"; private static final String OPTION_OPEN_REGISTRATION = "open-registration"; - + private static final String OPTION_RELOAD_NODE = "reload-node"; private static final String OPTION_AUTO_CREATE = "auto-create"; @@ -72,6 +73,8 @@ public class SymmetricLauncher { private static final String OPTION_DDL_GEN = "generate-config-dll"; + private static final String OPTION_PURGE = "purge"; + private static final String OPTION_RUN_DDL_XML = "run-ddl"; private static final String OPTION_RUN_SQL = "run-sql"; @@ -81,11 +84,11 @@ public class SymmetricLauncher { private static final String OPTION_PROPERTIES_FILE = "properties"; private static final String OPTION_START_SERVER = "server"; - + private static final String OPTION_LOAD_BATCH = "load-batch"; public static void main(String[] args) throws Exception { - + CommandLineParser parser = new PosixParser(); Options options = buildOptions(); try { @@ -110,7 +113,7 @@ public static void main(String[] args) throws Exception { throw new ParseException("Could not find the properties file specified: " + line.getOptionValue(OPTION_PROPERTIES_FILE)); } - + } if (line.hasOption(OPTION_DDL_GEN)) { @@ -118,6 +121,12 @@ public static void main(String[] args) throws Exception { return; } + if (line.hasOption(OPTION_PURGE)) { + ((IPurgeService) new SymmetricEngine().getApplicationContext().getBean(Constants.PURGE_SERVICE)) + .purge(); + return; + } + if (line.hasOption(OPTION_OPEN_REGISTRATION)) { String arg = line.getOptionValue(OPTION_OPEN_REGISTRATION); openRegistration(new SymmetricEngine(), arg); @@ -152,12 +161,12 @@ public static void main(String[] args) throws Exception { runSql(new SymmetricEngine(), line.getOptionValue(OPTION_RUN_SQL)); return; } - + if (line.hasOption(OPTION_LOAD_BATCH)) { loadBatch(new SymmetricEngine(), line.getOptionValue(OPTION_LOAD_BATCH)); } - if (line.hasOption(OPTION_START_SERVER)) { + if (line.hasOption(OPTION_START_SERVER)) { testConnection(); new SymmetricWebServer().start(serverPort); return; @@ -169,12 +178,15 @@ public static void main(String[] args) throws Exception { System.err.println(exp.getMessage()); printHelp(options); } catch (Exception ex) { - System.err.println("-----------------------------------------------------------------------------------------------"); - System.err.println(" An exception occurred. Please see the following for details: "); - System.err.println("-----------------------------------------------------------------------------------------------"); - + System.err + .println("-----------------------------------------------------------------------------------------------"); + System.err.println(" An exception occurred. Please see the following for details: "); + System.err + .println("-----------------------------------------------------------------------------------------------"); + ExceptionUtils.printRootCauseStackTrace(ex, System.err); - System.err.println("-----------------------------------------------------------------------------------------------"); + System.err + .println("-----------------------------------------------------------------------------------------------"); printHelp(options); } } @@ -182,11 +194,11 @@ public static void main(String[] args) throws Exception { private static void printHelp(Options options) { new HelpFormatter().printHelp("sym", options); } - - + private static void testConnection() throws Exception { - ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] {"classpath:/symmetric-properties.xml","classpath:/symmetric-database.xml"}); - BasicDataSource ds = (BasicDataSource)ctx.getBean(Constants.DATA_SOURCE); + ApplicationContext ctx = new ClassPathXmlApplicationContext(new String[] { + "classpath:/symmetric-properties.xml", "classpath:/symmetric-database.xml" }); + BasicDataSource ds = (BasicDataSource) ctx.getBean(Constants.DATA_SOURCE); Connection c = ds.getConnection(); c.close(); ds.close(); @@ -206,6 +218,7 @@ private static Options buildOptions() { OPTION_PROPERTIES_FILE, true, "Takes an argument with the path to the properties file that will drive symmetric. If this is not provided, symmetric will use defaults, then override with the first symmetric.properties in your classpath, then override with symmetric.properties values in your user.home directory."); + options.addOption("X", OPTION_PURGE, false, "Will simply run the purge process against the currently configured database."); options .addOption("g", OPTION_PROPERTIES_GEN, true, "Takes an argument with the path to a file which all the default overrideable properties will be written."); @@ -221,13 +234,11 @@ private static Options buildOptions() { options .addOption("R", OPTION_OPEN_REGISTRATION, true, "Open registration for the passed in node group and external id. Takes an argument of {groupId},{externalId}."); - options - .addOption("l", OPTION_RELOAD_NODE, true, - "Send an initial load of data to reload the passed in node id."); + options.addOption("l", OPTION_RELOAD_NODE, true, + "Send an initial load of data to reload the passed in node id."); options.addOption("d", OPTION_DUMP_BATCH, true, "Print the contents of a batch out to the console. Takes the batch id as an argument."); - options.addOption("b", OPTION_LOAD_BATCH, true, - "Load the CSV contents of the specfied file."); + options.addOption("b", OPTION_LOAD_BATCH, true, "Load the CSV contents of the specfied file."); return options; } @@ -239,9 +250,10 @@ private static void dumpBatch(SymmetricEngine engine, String batchId) throws Exc dataExtractorService.extractBatchRange(transport, batchId, batchId); transport.close(); } - + private static void loadBatch(SymmetricEngine engine, String fileName) throws Exception { - IDataLoaderService service = (IDataLoaderService)engine.getApplicationContext().getBean(Constants.DATALOADER_SERVICE); + IDataLoaderService service = (IDataLoaderService) engine.getApplicationContext().getBean( + Constants.DATALOADER_SERVICE); File file = new File(fileName); if (file.exists() && file.isFile()) { FileInputStream in = new FileInputStream(file); @@ -269,8 +281,7 @@ private static void openRegistration(SymmetricEngine engine, String argument) { } private static String reloadNode(SymmetricEngine engine, String argument) { - IDataService dataService = (IDataService) engine.getApplicationContext().getBean( - Constants.DATA_SERVICE); + IDataService dataService = (IDataService) engine.getApplicationContext().getBean(Constants.DATA_SERVICE); return dataService.reloadNode(argument); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 932fe5dbdd..18cca7282e 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -38,6 +38,7 @@ import org.jumpmind.symmetric.service.LockAction; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.support.JdbcUtils; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; @@ -63,9 +64,7 @@ public class PurgeService extends AbstractService implements IPurgeService { private String deleteFromOutgoingBatchSql; - private String selectEventDataIdToPurgeSql; - - private String deleteDataEventSql; + private String deleteFromEventDataIdSql; private String selectDataIdToPurgeSql; @@ -172,36 +171,33 @@ public Object doInTransaction(final TransactionStatus s) { @SuppressWarnings("unchecked") private void purgeBatchesOlderThan(final Calendar time) { // Iterate over batch ids and data events to access by primary key so we prevent lock escalation - final List batchIds = jdbcTemplate.queryForList(selectOutgoingBatchIdsToPurgeSql, new Object[] { time - .getTime() }, Integer.class); + final List batchIds = jdbcTemplate.query(selectOutgoingBatchIdsToPurgeSql, new Object[] { time + .getTime() }, new RowMapper() { + public Object mapRow(ResultSet rs, int row) throws SQLException { + return new BatchForNode(rs.getInt(1), rs.getString(2)); + } + }); int eventRowCount = 0; int dataIdCount = 0; int batchesPurged = 0; long ts = System.currentTimeMillis(); - for (final Integer batchId : batchIds) { + for (final BatchForNode batchNode : batchIds) { do { dataIdCount = (Integer) transactionTemplate.execute(new TransactionCallback() { public Object doInTransaction(final TransactionStatus s) { - jdbcTemplate.update(deleteFromOutgoingBatchHistSql, new Object[] { batchId }); + jdbcTemplate.update(deleteFromOutgoingBatchHistSql, new Object[] { batchNode.batchId }); - int eventCount = 0; - List dataIds = null; - dataIds = getNextDataIds(selectEventDataIdToPurgeSql, new Object[] { batchId }, - maxNumOfDataIdsToPurgeInTx); + int eventCount = jdbcTemplate.update(deleteFromEventDataIdSql, new Object[] { batchNode.batchId, batchNode.nodeId }); - for (final Integer dataId : dataIds) { - eventCount += jdbcTemplate.update(deleteDataEventSql, new Object[] { dataId, batchId }); - } - - jdbcTemplate.update(deleteFromOutgoingBatchSql, new Object[] { batchId }); + jdbcTemplate.update(deleteFromOutgoingBatchSql, new Object[] { batchNode.batchId, batchNode.nodeId }); return eventCount; } }); eventRowCount += dataIdCount; if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) { - logger.info("Purged " + batchesPurged + " out of " + batchIds.size() + " total and " + eventRowCount - + " data_events."); + logger.info("Purged " + batchesPurged + " out of " + batchIds.size() + " total and " + + eventRowCount + " data_events."); ts = System.currentTimeMillis(); } } while (dataIdCount > 0); @@ -275,12 +271,8 @@ public void setDeleteFromOutgoingBatchSql(String deleteFromOutgoingBatchSql) { this.deleteFromOutgoingBatchSql = deleteFromOutgoingBatchSql; } - public void setSelectEventDataIdToPurgeSql(String selectDataIdToPurgeSql) { - this.selectEventDataIdToPurgeSql = selectDataIdToPurgeSql; - } - - public void setDeleteDataEventSql(String deleteDataEventSql) { - this.deleteDataEventSql = deleteDataEventSql; + public void setDeleteFromEventDataIdSql(String selectDataIdToPurgeSql) { + this.deleteFromEventDataIdSql = selectDataIdToPurgeSql; } public void setDeleteDataSql(String deleteDataSql) { @@ -307,4 +299,17 @@ public void setDeleteIncomingBatchesByNodeIdSql(String[] deleteIncomingBatchesBy this.deleteIncomingBatchesByNodeIdSql = deleteIncomingBatchesByNodeIdSql; } + class BatchForNode { + + int batchId; + + String nodeId; + + public BatchForNode(int batchId, String nodeId) { + super(); + this.batchId = batchId; + this.nodeId = nodeId; + } + } + } diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 5a227de048..85c76487a8 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -731,7 +731,7 @@ value="${symmetric.runtime.job.purge.max.num.data.events.to.delete.in.tx}" /> - select batch_id from ${sync.table.prefix}_outgoing_batch where status='OK' and + select batch_id, node_id from ${sync.table.prefix}_outgoing_batch where status='OK' and create_time < ? @@ -739,15 +739,10 @@ delete from ${sync.table.prefix}_outgoing_batch_hist where batch_id=? - delete from ${sync.table.prefix}_outgoing_batch where batch_id=? + delete from ${sync.table.prefix}_outgoing_batch where batch_id=? and node_id=? - - select data_id from ${sync.table.prefix}_data_event where batch_id=? - - - - delete from ${sync.table.prefix}_data_event where data_id=? and batch_id=? - + + delete from ${sync.table.prefix}_data_event where batch_id=? and node_id=?