diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java index 4f1b1cb7c0..8cb572e844 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/SymmetricAdmin.java @@ -329,9 +329,10 @@ private void runPurge(CommandLine line, List args) { private void exportBatch(CommandLine line, List args) throws Exception { IDataExtractorService dataExtractorService = getSymmetricEngine().getDataExtractorService(); + String nodeId = popArg(args, "Node ID"); String batchId = popArg(args, "Batch ID"); OutputStreamWriter writer = getWriter(args); - dataExtractorService.extractBatchRange(writer, Long.valueOf(batchId), Long.valueOf(batchId)); + dataExtractorService.extractBatchRange(writer, nodeId, Long.valueOf(batchId), Long.valueOf(batchId)); writer.close(); } diff --git a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java index 11bb440594..24d495806f 100644 --- a/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java +++ b/symmetric/symmetric-client/src/main/java/org/jumpmind/symmetric/service/jmx/NodeManagementService.java @@ -334,13 +334,14 @@ public String reloadTable(String nodeId, String catalogName, String schemaName, @ManagedOperation(description = "Write a range of batches to a file in SymmetricDS Data Format.") @ManagedOperationParameters({ + @ManagedOperationParameter(name = "nodeId", description = "The node id for the batches which will be written"), @ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"), @ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"), @ManagedOperationParameter(name = "fileName", description = "File name to write batches") }) - public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName) + public void writeBatchRangeToFile(String nodeId, String startBatchId, String endBatchId, String fileName) throws Exception { Writer writer = new FileWriter(new File(fileName)); - engine.getDataExtractorService().extractBatchRange(writer, Long.valueOf(startBatchId), + engine.getDataExtractorService().extractBatchRange(writer, nodeId, Long.valueOf(startBatchId), Long.valueOf(endBatchId)); IOUtils.closeQuietly(writer); } diff --git a/symmetric/symmetric-client/src/main/resources/symmetric-messages.properties b/symmetric/symmetric-client/src/main/resources/symmetric-messages.properties index 4df6127ca8..7c86d7f637 100644 --- a/symmetric/symmetric-client/src/main/resources/symmetric-messages.properties +++ b/symmetric/symmetric-client/src/main/resources/symmetric-messages.properties @@ -43,7 +43,7 @@ SymAdmin.Cmd.send-schema=Send schema change to node SymAdmin.Cmd.send-script=Send script to node SymAdmin.Usage.reload-node= SymAdmin.Usage.reload-table= -SymAdmin.Usage.export-batch= [] +SymAdmin.Usage.export-batch= [] SymAdmin.Usage.import-batch=[] SymAdmin.Usage.run-purge=[all | outgoing | incoming | data-gaps] SymAdmin.Usage.encrypt-text= diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java index 1363209abb..f3d7d1701b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java @@ -428,5 +428,10 @@ public void setCreateTime(Date createTime) { public long totalEventCount() { return insertEventCount + updateEventCount + deleteEventCount + otherEventCount; } + + @Override + public String toString() { + return getNodeBatchId(); + } } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index 9a30c9053e..ff803379ad 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -42,6 +42,6 @@ public interface IDataExtractorService { */ public List extract(Node node, IOutgoingTransport transport); - public boolean extractBatchRange(Writer writer, long startBatchId, long endBatchId); + public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId); } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 52ab1ec1a0..7accfcbc60 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -123,9 +123,9 @@ public interface IDataService { public Data mapData(Row row); - public List listDataIds(long batchId); + public List listDataIds(long batchId, String nodeId); - public List listData(long batchId, long startDataId, String channelId, int maxRowsToRetrieve); + public List listData(long batchId, String nodeId, long startDataId, String channelId, int maxRowsToRetrieve); public void insertDataGap(DataGap gap); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 1051e662b6..f1c2def407 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -38,7 +38,7 @@ public interface IOutgoingBatchService { public void updateAbandonedRoutingBatches(); - public OutgoingBatch findOutgoingBatch(long batchId); + public OutgoingBatch findOutgoingBatch(long batchId, String nodeId); public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChannels); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index eccb117d74..194456cb58 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -76,7 +76,7 @@ public void ack(final BatchInfo batch) { } } else { OutgoingBatch outgoingBatch = outgoingBatchService - .findOutgoingBatch(batch.getBatchId()); + .findOutgoingBatch(batch.getBatchId(), batch.getNodeId()); Status status = batch.isOk() ? Status.OK : Status.ER; if (outgoingBatch != null) { // Allow an outside system/user to indicate that a batch diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 58878a9dca..81b8c73d12 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -340,7 +340,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, // that the status has not changed if (System.currentTimeMillis() - batchesSelectedAtMs > MS_PASSED_BEFORE_BATCH_REQUERIED) { currentBatch = outgoingBatchService - .findOutgoingBatch(currentBatch.getBatchId()); + .findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId()); } if (currentBatch.getStatus() != Status.OK) { @@ -374,7 +374,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, // check that the status has not changed if (System.currentTimeMillis() - currentBatch.getLastUpdatedTime().getTime() > MS_PASSED_BEFORE_BATCH_REQUERIED) { currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch - .getBatchId()); + .getBatchId(), currentBatch.getNodeId()); } if (extractTimeInMs > 0) { @@ -418,7 +418,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, if (System.currentTimeMillis() - currentBatch.getLastUpdatedTime().getTime() > MS_PASSED_BEFORE_BATCH_REQUERIED) { currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch - .getBatchId()); + .getBatchId(), currentBatch.getNodeId()); } if (currentBatch.getStatus() != Status.OK) { @@ -469,10 +469,10 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, } - public boolean extractBatchRange(Writer writer, long startBatchId, long endBatchId) { + public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId) { boolean foundBatch = false; for (long batchId = startBatchId; batchId <= endBatchId; batchId++) { - OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId); + OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId); Node targetNode = nodeService.findNode(batch.getNodeId()); IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(), new SelectFromSymDataSource(batch, targetNode)); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 2f3ea7b6ec..894ec3178b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -905,15 +905,15 @@ public boolean removeHeartbeatListener(IHeartbeatListener listener) { } } - public List listDataIds(long batchId) { + public List listDataIds(long batchId, String nodeId) { return sqlTemplate.query(getSql("selectEventDataIdsSql", " order by d.data_id asc"), - new NumberMapper(), batchId); + new NumberMapper(), batchId, nodeId); } - public List listData(long batchId, long startDataId, String channelId, + public List listData(long batchId, String nodeId, long startDataId, String channelId, final int maxRowsToRetrieve) { return sqlTemplate.query(getDataSelectSql(batchId, startDataId, channelId), - maxRowsToRetrieve, this.dataMapper, batchId, startDataId); + maxRowsToRetrieve, this.dataMapper, batchId, nodeId, startDataId); } public Data mapData(Row row) { @@ -923,7 +923,7 @@ public Data mapData(Row row) { public ISqlReadCursor selectDataFor(Batch batch) { return sqlTemplate.queryForCursor( getDataSelectSql(batch.getBatchId(), -1l, batch.getChannelId()), dataMapper, - new Object[] { batch.getBatchId() }, new int[] { Types.NUMERIC }); + new Object[] { batch.getBatchId(), batch.getNodeId() }, new int[] { Types.NUMERIC }); } protected String getDataSelectSql(long batchId, long startDataId, String channelId) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index d714b4de69..ef6f528726 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -14,13 +14,13 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, " + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, e.router_id from $(data) d inner join " + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " - + " where o.batch_id = ? "); + + " where o.batch_id = ? and o.node_id = ? "); putSql("selectEventDataIdsSql", "" + "select d.data_id from $(data) d inner join " + " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id " - + " where o.batch_id = ? "); + + " where o.batch_id = ? and o.node_id = ? "); putSql("selectMaxDataEventDataIdSql", "" + "select max(data_id) from $(data_event) "); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index ca0d7a3069..45f866282a 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -154,10 +154,10 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo outgoingBatch.setBatchId(batchId); } - public OutgoingBatch findOutgoingBatch(long batchId) { + public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) { List list = (List) sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"), - new OutgoingBatchMapper(true, false), new Object[] { batchId }, + new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId }, new int[] { Types.NUMERIC }); if (list != null && list.size() > 0) { return list.get(0); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 4c2c7d18c6..f7f2c45632 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -38,7 +38,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map - - - diff --git a/symmetric/symmetric-server/src/main/java/org/jumpmind/symmetric/web/BatchUriHandler.java b/symmetric/symmetric-server/src/main/java/org/jumpmind/symmetric/web/BatchUriHandler.java index e145a3648f..094d6b8672 100644 --- a/symmetric/symmetric-server/src/main/java/org/jumpmind/symmetric/web/BatchUriHandler.java +++ b/symmetric/symmetric-server/src/main/java/org/jumpmind/symmetric/web/BatchUriHandler.java @@ -48,19 +48,26 @@ public void handleWithCompression(HttpServletRequest req, HttpServletResponse re String path = req.getPathInfo(); if (!StringUtils.isBlank(path)) { int batchIdStartIndex = path.lastIndexOf("/") + 1; - String batchId = path.substring(batchIdStartIndex); - if (!write(batchId, res.getOutputStream())) { - ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND); + String nodeIdBatchId = path.substring(batchIdStartIndex); + if (nodeIdBatchId.contains("-")) { + String[] array = nodeIdBatchId.split("-"); + String nodeId = array[0]; + String batchId = array[1]; + if (!write(batchId, nodeId, res.getOutputStream())) { + ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND); + } else { + res.flushBuffer(); + } } else { - res.flushBuffer(); + ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND); } } else { res.sendError(HttpServletResponse.SC_NOT_FOUND); } } - public boolean write(String batchId, OutputStream os) throws IOException { - return dataExtractorService.extractBatchRange(new OutputStreamWriter(os, "UTF-8"), + public boolean write(String batchId, String nodeId, OutputStream os) throws IOException { + return dataExtractorService.extractBatchRange(new OutputStreamWriter(os, "UTF-8"), nodeId, Long.valueOf(batchId), Long.valueOf(batchId)); }