Skip to content

Commit

Permalink
ignore batch functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 26, 2012
1 parent 3b223f1 commit c4a1f9a
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 63 deletions.
Expand Up @@ -147,19 +147,20 @@ public void extractConfigurationStandalone(Node node, OutputStream out) {
* load for some reason on the client the batch status will NOT reflect the
* failure.
*/
public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) {
public void extractConfigurationStandalone(Node targetNode, Writer writer,
String... tablesToExclude) {
Node sourceNode = nodeService.findIdentity();

Batch batch = new Batch(BatchAck.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG,
symmetricDialect.getBinaryEncoding(), targetNode.getNodeId(), false);

NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(),
targetNode.getNodeGroupId());

List<TriggerRouter> triggerRouters = triggerRouterService
.buildTriggerRoutersForSymmetricTables(StringUtils.isBlank(targetNode
.getSymmetricVersion()) ? Version.version() : targetNode.getSymmetricVersion(),
nodeGroupLink, tablesToExclude);
.buildTriggerRoutersForSymmetricTables(
StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version.version()
: targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude);

List<SelectFromTableEvent> initialLoadEvents = new ArrayList<SelectFromTableEvent>(
triggerRouters.size() * 2);
Expand Down Expand Up @@ -202,8 +203,8 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer, Strin

if (!triggerRouter.getTrigger().getSourceTableName()
.endsWith(TableConstants.SYM_NODE_IDENTITY)) {
initialLoadEvents
.add(new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory));
initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter,
triggerHistory));
} else {
Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT,
triggerHistory.getSourceTableName(), null, triggerHistory, triggerRouter
Expand Down Expand Up @@ -439,6 +440,12 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode,
currentBatch.isCommonFlag());
batch.setIgnored(true);
try {
IStagedResource resource = stagingManager.find(
Constants.STAGING_CATEGORY_OUTGOING, batch.getStagedLocation(),
batch.getBatchId());
if (resource != null) {
resource.delete();
}
DataContext ctx = new DataContext(batch);
ctx.put("targetNode", targetNode);
ctx.put("sourceNode", sourceNode);
Expand Down Expand Up @@ -532,7 +539,8 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}
}

protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch currentBatch, IDataWriter dataWriter) {
protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch currentBatch,
IDataWriter dataWriter) {
if (currentBatch.getStatus() != Status.OK) {

currentBatch.setSentCount(currentBatch.getSentCount() + 1);
Expand Down Expand Up @@ -669,7 +677,7 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
class SelectFromSymDataSource implements IExtractDataReaderSource {

private Batch batch;

private OutgoingBatch outgoingBatch;

private Table currentTable;
Expand Down Expand Up @@ -777,7 +785,7 @@ public void close() {
}

class SelectFromTableSource implements IExtractDataReaderSource {

private OutgoingBatch outgoingBatch;

private Batch batch;
Expand All @@ -796,7 +804,8 @@ class SelectFromTableSource implements IExtractDataReaderSource {

private TriggerRouter triggerRouter;

public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch, SelectFromTableEvent event) {
public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch,
SelectFromTableEvent event) {
this.outgoingBatch = outgoingBatch;
List<SelectFromTableEvent> initialLoadEvents = new ArrayList<DataExtractorService.SelectFromTableEvent>(
1);
Expand Down Expand Up @@ -867,7 +876,7 @@ public CsvData next() {
data = next();
}
}

if (data != null && outgoingBatch != null) {
outgoingBatch.incrementDataEventCount();
outgoingBatch.incrementEventCount(data.getDataEventType());
Expand Down
Expand Up @@ -100,34 +100,29 @@ public void updateOutgoingBatches(List<OutgoingBatch> outgoingBatches) {
public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedTime(new Date());
outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId());
sqlTemplate
.update(getSql("updateOutgoingBatchSql"),
new Object[] { outgoingBatch.getStatus().name(),
outgoingBatch.isLoadFlag() ? 1 : 0,
outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(),
outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(),
outgoingBatch.getLoadCount(), outgoingBatch.getDataEventCount(),
outgoingBatch.getReloadEventCount(),
outgoingBatch.getInsertEventCount(),
outgoingBatch.getUpdateEventCount(),
outgoingBatch.getDeleteEventCount(),
outgoingBatch.getOtherEventCount(),
outgoingBatch.getIgnoreCount(),
outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(),
outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(),
outgoingBatch.getExtractMillis(), outgoingBatch.getSqlState(),
outgoingBatch.getSqlCode(),
StringUtils.abbreviate(outgoingBatch.getSqlMessage(), 1000),
outgoingBatch.getFailedDataId(),
outgoingBatch.getLastUpdatedHostName(),
outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() },
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.NUMERIC,
Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP,
Types.NUMERIC, Types.VARCHAR });
sqlTemplate.update(
getSql("updateOutgoingBatchSql"),
new Object[] { outgoingBatch.getStatus().name(),
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0,
outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(),
outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(),
outgoingBatch.getDataEventCount(), outgoingBatch.getReloadEventCount(),
outgoingBatch.getInsertEventCount(), outgoingBatch.getUpdateEventCount(),
outgoingBatch.getDeleteEventCount(), outgoingBatch.getOtherEventCount(),
outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(),
outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(),
outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(),
outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(),
StringUtils.abbreviate(outgoingBatch.getSqlMessage(), 1000),
outgoingBatch.getFailedDataId(), outgoingBatch.getLastUpdatedHostName(),
outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(),
outgoingBatch.getNodeId() },
new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR,
Types.NUMERIC, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP,
Types.NUMERIC, Types.VARCHAR });
}

public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
Expand All @@ -150,16 +145,17 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
}
transaction.prepareAndExecute(getSql("insertOutgoingBatchSql"), batchId, outgoingBatch
.getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus().name(),
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, outgoingBatch.getReloadEventCount(),
outgoingBatch.getOtherEventCount(), outgoingBatch.getLastUpdatedHostName());
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0,
outgoingBatch.getReloadEventCount(), outgoingBatch.getOtherEventCount(),
outgoingBatch.getLastUpdatedHostName());
outgoingBatch.setBatchId(batchId);
}

public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"),
new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId },
new int[] { Types.NUMERIC, Types.VARCHAR });
new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId }, new int[] {
Types.NUMERIC, Types.VARCHAR });
if (list != null && list.size() > 0) {
return list.get(0);
} else {
Expand All @@ -178,11 +174,18 @@ public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
params.put("NODES", nodeIds);
params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(statuses));
return sqlTemplate
.queryForInt(
getSql("selectCountBatchesPrefixSql",
containsOnlyErrorStatus(statuses) ? "selectOutgoingBatchByChannelWithErrorSql"
: "selectOutgoingBatchByChannelAndStatusSql"), params);
String sql = null;
if (containsOnlyStatus(Status.ER, statuses)) {
sql = getSql("selectCountBatchesPrefixSql",
"selectOutgoingBatchByChannelWithErrorSql");
} else if (containsOnlyStatus(Status.IG, statuses)) {
sql = getSql("selectCountBatchesPrefixSql",
"selectOutgoingBatchByChannelWithIgnoreSql");
} else {
sql = getSql("selectCountBatchesPrefixSql",
"selectOutgoingBatchByChannelAndStatusSql");
}
return sqlTemplate.queryForInt(sql, params);
} else {
return 0;
}
Expand All @@ -206,10 +209,20 @@ public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String
}
}

String sql = getSql("selectOutgoingBatchPrefixSql",
containsOnlyErrorStatus(statuses) ? "selectOutgoingBatchByChannelWithErrorSql"
: "selectOutgoingBatchByChannelAndStatusSql", startAtBatchIdSql,
ascending ? "order by batch_id asc" : " order by batch_id desc");
String sql = null;
if (containsOnlyStatus(Status.ER, statuses)) {
sql = getSql("selectOutgoingBatchPrefixSql",
"selectOutgoingBatchByChannelWithErrorSql", startAtBatchIdSql,
ascending ? "order by batch_id asc" : " order by batch_id desc");
} else if (containsOnlyStatus(Status.IG, statuses)) {
sql = getSql("selectOutgoingBatchPrefixSql",
"selectOutgoingBatchByChannelWithIgnoreSql", startAtBatchIdSql,
ascending ? "order by batch_id asc" : " order by batch_id desc");
} else {
sql = getSql("selectOutgoingBatchPrefixSql",
"selectOutgoingBatchByChannelAndStatusSql", startAtBatchIdSql,
ascending ? "order by batch_id asc" : " order by batch_id desc");
}

return sqlTemplate.query(sql, maxRowsToRetrieve, new OutgoingBatchMapper(true, false),
params);
Expand All @@ -227,8 +240,9 @@ protected List<String> toStringList(List<OutgoingBatch.Status> statuses) {

}

protected boolean containsOnlyErrorStatus(List<OutgoingBatch.Status> statuses) {
return statuses.size() == 1 && statuses.get(0) == OutgoingBatch.Status.ER;
protected boolean containsOnlyStatus(OutgoingBatch.Status status,
List<OutgoingBatch.Status> statuses) {
return statuses.size() == 1 && statuses.get(0) == status;
}

/**
Expand All @@ -246,7 +260,8 @@ public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChan
maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, true),
new Object[] { node.getNodeId(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(), OutgoingBatch.Status.IG.name() }, null);
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
OutgoingBatch.Status.IG.name() }, null);

OutgoingBatches batches = new OutgoingBatches(list);

Expand Down
Expand Up @@ -36,6 +36,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String

putSql("selectOutgoingBatchByChannelWithErrorSql" ,"" +
"where node_id in (:NODES) and channel_id in (:CHANNELS) and error_flag=1 " );

putSql("selectOutgoingBatchByChannelWithIgnoreSql" ,"" +
"where node_id in (:NODES) and channel_id in (:CHANNELS) and ignore_count > 0 " );

putSql("findOutgoingBatchSql" ,"" +
"where batch_id=? and node_id=? " );
Expand Down
Expand Up @@ -102,7 +102,11 @@ protected String getAcknowledgementData(String nodeId, List<IncomingBatch> list)
append(builder, WebConstants.ACK_NETWORK_MILLIS + batchId, batch.getNetworkMillis());
append(builder, WebConstants.ACK_FILTER_MILLIS + batchId, batch.getFilterMillis());
append(builder, WebConstants.ACK_DATABASE_MILLIS + batchId, batch.getDatabaseMillis());
append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount());
append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount());

if (batch.getIgnoreCount() > 0) {
append(builder, WebConstants.ACK_IGNORE_COUNT + batchId, batch.getIgnoreCount());
}

if (batch.getStatus() == Status.ER) {
append(builder, WebConstants.ACK_SQL_STATE + batchId, batch.getSqlState());
Expand Down Expand Up @@ -151,7 +155,8 @@ private static BatchAck getBatchInfo(Map<String, ? extends Object> parameters, l
batchInfo.setNetworkMillis(getParamAsNum(parameters, WebConstants.ACK_NETWORK_MILLIS + batchId));
batchInfo.setFilterMillis(getParamAsNum(parameters, WebConstants.ACK_FILTER_MILLIS + batchId));
batchInfo.setDatabaseMillis(getParamAsNum(parameters, WebConstants.ACK_DATABASE_MILLIS + batchId));
batchInfo.setByteCount(getParamAsNum(parameters, WebConstants.ACK_BYTE_COUNT + batchId));
batchInfo.setByteCount(getParamAsNum(parameters, WebConstants.ACK_BYTE_COUNT + batchId));
batchInfo.setIgnored(getParamAsBoolean(parameters, WebConstants.ACK_IGNORE_COUNT + batchId));
String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "").trim();
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK));

Expand All @@ -178,7 +183,11 @@ protected static Map<String, Object> getParametersFromQueryUrl(String parameterS

private static long getParamAsNum(Map<String, ? extends Object> parameters, String parameterName) {
return NumberUtils.toLong(getParam(parameters, parameterName));
}
}

private static boolean getParamAsBoolean(Map<String, ? extends Object> parameters, String parameterName) {
return getParamAsNum(parameters, parameterName) > 0;
}

private static String getParam(Map<String, ? extends Object> parameters, String parameterName, String defaultValue) {
String value = getParam(parameters, parameterName);
Expand Down
Expand Up @@ -58,7 +58,9 @@ public class WebConstants {

public static final String ACK_DATABASE_MILLIS = "database-";

public static final String ACK_BYTE_COUNT = "byteCount-";
public static final String ACK_BYTE_COUNT = "byteCount-";

public static final String ACK_IGNORE_COUNT = "ignoreCount-";

public static final String ACK_SQL_STATE = "sqlState-";

Expand Down
Expand Up @@ -74,9 +74,6 @@ public void start(Batch batch) {
println(CsvConstants.CHANNEL, batch.getChannelId());
}
println(CsvConstants.BATCH, Long.toString(batch.getBatchId()));
if (batch.isIgnored()) {
println(CsvConstants.IGNORE);
}
}

public boolean start(Table table) {
Expand Down Expand Up @@ -143,6 +140,11 @@ public void end(Table table) {
}

final public void end(Batch batch, boolean inError) {

if (batch.isIgnored()) {
println(CsvConstants.IGNORE);
}

if (!inError) {
println(CsvConstants.COMMIT, Long.toString(batch.getBatchId()));
endBatch(batch);
Expand Down
Expand Up @@ -738,6 +738,9 @@ public void end(Table table) {
public void end(Batch batch, boolean inError) {
this.lastData = null;
this.currentDmlStatement = null;
if (batch.isIgnored()) {
getStatistics().get(batch).increment(DataWriterStatisticConstants.IGNORECOUNT);
}
if (!inError) {
notifyFiltersBatchComplete();
commit();
Expand Down

0 comments on commit c4a1f9a

Please sign in to comment.