diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CommonBatchCollisionException.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CommonBatchCollisionException.java new file mode 100644 index 0000000000..2d4c83515b --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/CommonBatchCollisionException.java @@ -0,0 +1,33 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.route; + +public class CommonBatchCollisionException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public CommonBatchCollisionException() { + } + + public CommonBatchCollisionException(String message) { + super(message); + } +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index fa1f6897b8..e66a75d05f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -72,6 +72,7 @@ import org.jumpmind.symmetric.route.CSVRouter; import org.jumpmind.symmetric.route.ChannelRouterContext; import org.jumpmind.symmetric.route.ColumnMatchDataRouter; +import org.jumpmind.symmetric.route.CommonBatchCollisionException; import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter; import org.jumpmind.symmetric.route.ConvertToReloadRouter; import org.jumpmind.symmetric.route.DBFRouter; @@ -552,6 +553,10 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no long dataCountWithBigLob = routeDataForChannel(processInfo, nodeChannel, sourceNode, true, batchesByNodes, batchesByGroups); return context.getCommittedDataEventCount() + dataCountWithBigLob; } + } catch (CommonBatchCollisionException e) { + gapDetector.setIsAllDataRead(false); + dataCount = 1; // we prevented writing the collision, so commit what we have + throw e; } catch (Throwable ex) { log.error( String.format("Failed to route and batch data on '%s' channel", @@ -924,6 +929,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con long loadId = -1; boolean dataEventAdded = false; boolean useCommonMode = context.isProduceCommonBatches() || context.isProduceGroupBatches(); + boolean firstTimeForGroup = false; int numberOfDataEventsInserted = 0; if (nodeIds == null || nodeIds.size() == 0) { @@ -938,6 +944,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con if (batches == null) { batches = new HashMap(); batchesByGroups.put(groupKey, batches); + firstTimeForGroup = true; } useCommonMode = nodeIds.size() > 1; } else { @@ -977,6 +984,10 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con log.debug("About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.", nodeId, batch.getChannelId(), batches.values()); } + + if (useCommonMode && !firstTimeForGroup) { + throw new CommonBatchCollisionException("Collision detected for common batch group"); + } engine.getOutgoingBatchService().insertOutgoingBatch(batch); processInfo.incrementBatchCount();