Skip to content

Commit

Permalink
more references by batchId
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 9, 2012
1 parent ce79841 commit a5a24af
Show file tree
Hide file tree
Showing 15 changed files with 44 additions and 33 deletions.
Expand Up @@ -329,9 +329,10 @@ private void runPurge(CommandLine line, List<String> args) {

private void exportBatch(CommandLine line, List<String> args) throws Exception {
IDataExtractorService dataExtractorService = getSymmetricEngine().getDataExtractorService();
String nodeId = popArg(args, "Node ID");
String batchId = popArg(args, "Batch ID");
OutputStreamWriter writer = getWriter(args);
dataExtractorService.extractBatchRange(writer, Long.valueOf(batchId), Long.valueOf(batchId));
dataExtractorService.extractBatchRange(writer, nodeId, Long.valueOf(batchId), Long.valueOf(batchId));
writer.close();
}

Expand Down
Expand Up @@ -334,13 +334,14 @@ public String reloadTable(String nodeId, String catalogName, String schemaName,

@ManagedOperation(description = "Write a range of batches to a file in SymmetricDS Data Format.")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "nodeId", description = "The node id for the batches which will be written"),
@ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"),
@ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"),
@ManagedOperationParameter(name = "fileName", description = "File name to write batches") })
public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName)
public void writeBatchRangeToFile(String nodeId, String startBatchId, String endBatchId, String fileName)
throws Exception {
Writer writer = new FileWriter(new File(fileName));
engine.getDataExtractorService().extractBatchRange(writer, Long.valueOf(startBatchId),
engine.getDataExtractorService().extractBatchRange(writer, nodeId, Long.valueOf(startBatchId),
Long.valueOf(endBatchId));
IOUtils.closeQuietly(writer);
}
Expand Down
Expand Up @@ -43,7 +43,7 @@ SymAdmin.Cmd.send-schema=Send schema change to node
SymAdmin.Cmd.send-script=Send script to node
SymAdmin.Usage.reload-node=<node-id>
SymAdmin.Usage.reload-table=<table>
SymAdmin.Usage.export-batch=<batch number> [<filename>]
SymAdmin.Usage.export-batch=<node-id> <batch number> [<filename>]
SymAdmin.Usage.import-batch=[<filename>]
SymAdmin.Usage.run-purge=[all | outgoing | incoming | data-gaps]
SymAdmin.Usage.encrypt-text=<text>
Expand Down
Expand Up @@ -428,5 +428,10 @@ public void setCreateTime(Date createTime) {
public long totalEventCount() {
return insertEventCount + updateEventCount + deleteEventCount + otherEventCount;
}

@Override
public String toString() {
return getNodeBatchId();
}

}
Expand Up @@ -42,6 +42,6 @@ public interface IDataExtractorService {
*/
public List<OutgoingBatch> extract(Node node, IOutgoingTransport transport);

public boolean extractBatchRange(Writer writer, long startBatchId, long endBatchId);
public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId);

}
Expand Down
Expand Up @@ -123,9 +123,9 @@ public interface IDataService {

public Data mapData(Row row);

public List<Number> listDataIds(long batchId);
public List<Number> listDataIds(long batchId, String nodeId);

public List<Data> listData(long batchId, long startDataId, String channelId, int maxRowsToRetrieve);
public List<Data> listData(long batchId, String nodeId, long startDataId, String channelId, int maxRowsToRetrieve);

public void insertDataGap(DataGap gap);

Expand Down
Expand Up @@ -38,7 +38,7 @@ public interface IOutgoingBatchService {

public void updateAbandonedRoutingBatches();

public OutgoingBatch findOutgoingBatch(long batchId);
public OutgoingBatch findOutgoingBatch(long batchId, String nodeId);

public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChannels);

Expand Down
Expand Up @@ -76,7 +76,7 @@ public void ack(final BatchInfo batch) {
}
} else {
OutgoingBatch outgoingBatch = outgoingBatchService
.findOutgoingBatch(batch.getBatchId());
.findOutgoingBatch(batch.getBatchId(), batch.getNodeId());
Status status = batch.isOk() ? Status.OK : Status.ER;
if (outgoingBatch != null) {
// Allow an outside system/user to indicate that a batch
Expand Down
Expand Up @@ -340,7 +340,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
// that the status has not changed
if (System.currentTimeMillis() - batchesSelectedAtMs > MS_PASSED_BEFORE_BATCH_REQUERIED) {
currentBatch = outgoingBatchService
.findOutgoingBatch(currentBatch.getBatchId());
.findOutgoingBatch(currentBatch.getBatchId(), currentBatch.getNodeId());
}

if (currentBatch.getStatus() != Status.OK) {
Expand Down Expand Up @@ -374,7 +374,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
// check that the status has not changed
if (System.currentTimeMillis() - currentBatch.getLastUpdatedTime().getTime() > MS_PASSED_BEFORE_BATCH_REQUERIED) {
currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch
.getBatchId());
.getBatchId(), currentBatch.getNodeId());
}

if (extractTimeInMs > 0) {
Expand Down Expand Up @@ -418,7 +418,7 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
if (System.currentTimeMillis()
- currentBatch.getLastUpdatedTime().getTime() > MS_PASSED_BEFORE_BATCH_REQUERIED) {
currentBatch = outgoingBatchService.findOutgoingBatch(currentBatch
.getBatchId());
.getBatchId(), currentBatch.getNodeId());
}

if (currentBatch.getStatus() != Status.OK) {
Expand Down Expand Up @@ -469,10 +469,10 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,

}

public boolean extractBatchRange(Writer writer, long startBatchId, long endBatchId) {
public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId) {
boolean foundBatch = false;
for (long batchId = startBatchId; batchId <= endBatchId; batchId++) {
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId);
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId);
Node targetNode = nodeService.findNode(batch.getNodeId());
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(batch, targetNode));
Expand Down
Expand Up @@ -905,15 +905,15 @@ public boolean removeHeartbeatListener(IHeartbeatListener listener) {
}
}

public List<Number> listDataIds(long batchId) {
public List<Number> listDataIds(long batchId, String nodeId) {
return sqlTemplate.query(getSql("selectEventDataIdsSql", " order by d.data_id asc"),
new NumberMapper(), batchId);
new NumberMapper(), batchId, nodeId);
}

public List<Data> listData(long batchId, long startDataId, String channelId,
public List<Data> listData(long batchId, String nodeId, long startDataId, String channelId,
final int maxRowsToRetrieve) {
return sqlTemplate.query(getDataSelectSql(batchId, startDataId, channelId),
maxRowsToRetrieve, this.dataMapper, batchId, startDataId);
maxRowsToRetrieve, this.dataMapper, batchId, nodeId, startDataId);
}

public Data mapData(Row row) {
Expand All @@ -923,7 +923,7 @@ public Data mapData(Row row) {
public ISqlReadCursor<Data> selectDataFor(Batch batch) {
return sqlTemplate.queryForCursor(
getDataSelectSql(batch.getBatchId(), -1l, batch.getChannelId()), dataMapper,
new Object[] { batch.getBatchId() }, new int[] { Types.NUMERIC });
new Object[] { batch.getBatchId(), batch.getNodeId() }, new int[] { Types.NUMERIC });
}

protected String getDataSelectSql(long batchId, long startDataId, String channelId) {
Expand Down
Expand Up @@ -14,13 +14,13 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ "select d.data_id, d.table_name, d.event_type, d.row_data, d.pk_data, d.old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, e.router_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? ");
+ " where o.batch_id = ? and o.node_id = ? ");

putSql("selectEventDataIdsSql",
""
+ "select d.data_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? ");
+ " where o.batch_id = ? and o.node_id = ? ");

putSql("selectMaxDataEventDataIdSql", ""
+ "select max(data_id) from $(data_event) ");
Expand Down
Expand Up @@ -154,10 +154,10 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
outgoingBatch.setBatchId(batchId);
}

public OutgoingBatch findOutgoingBatch(long batchId) {
public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"),
new OutgoingBatchMapper(true, false), new Object[] { batchId },
new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId },
new int[] { Types.NUMERIC });
if (list != null && list.size() > 0) {
return list.get(0);
Expand Down
Expand Up @@ -38,7 +38,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String
"where node_id in (:NODES) and channel_id in (:CHANNELS) and error_flag=1 " );

putSql("findOutgoingBatchSql" ,"" +
"where batch_id=? " );
"where batch_id=? and node_id=? " );

putSql("selectOutgoingBatchSql" ,"" +
"where node_id = ? and status in (?, ?, ?, ?, ?) order by batch_id asc " );
Expand Down
Expand Up @@ -350,9 +350,6 @@
<column name="last_update_hostname" type="VARCHAR" size="255" description="The host name of the process that last did work on this batch." />
<column name="last_update_time" type="TIMESTAMP" description="Timestamp when a process last updated this entry." />
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<index name="idx_ob_batch_id">
<index-column name="batch_id" />
</index>
<index name="idx_ob_node_status">
<index-column name="node_id" />
<index-column name="status" />
Expand Down
Expand Up @@ -48,19 +48,26 @@ public void handleWithCompression(HttpServletRequest req, HttpServletResponse re
String path = req.getPathInfo();
if (!StringUtils.isBlank(path)) {
int batchIdStartIndex = path.lastIndexOf("/") + 1;
String batchId = path.substring(batchIdStartIndex);
if (!write(batchId, res.getOutputStream())) {
ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND);
String nodeIdBatchId = path.substring(batchIdStartIndex);
if (nodeIdBatchId.contains("-")) {
String[] array = nodeIdBatchId.split("-");
String nodeId = array[0];
String batchId = array[1];
if (!write(batchId, nodeId, res.getOutputStream())) {
ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND);
} else {
res.flushBuffer();
}
} else {
res.flushBuffer();
ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND);
}
} else {
res.sendError(HttpServletResponse.SC_NOT_FOUND);
}
}

public boolean write(String batchId, OutputStream os) throws IOException {
return dataExtractorService.extractBatchRange(new OutputStreamWriter(os, "UTF-8"),
public boolean write(String batchId, String nodeId, OutputStream os) throws IOException {
return dataExtractorService.extractBatchRange(new OutputStreamWriter(os, "UTF-8"), nodeId,
Long.valueOf(batchId), Long.valueOf(batchId));
}

Expand Down

0 comments on commit a5a24af

Please sign in to comment.