Skip to content

Commit

Permalink
0004218: Prevent hash collision when routing common batch groups
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 31, 2019
1 parent 8429d37 commit 90d69d2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
@@ -0,0 +1,33 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.route;

public class CommonBatchCollisionException extends RuntimeException {

private static final long serialVersionUID = 1L;

public CommonBatchCollisionException() {
}

public CommonBatchCollisionException(String message) {
super(message);
}
}
Expand Up @@ -72,6 +72,7 @@
import org.jumpmind.symmetric.route.CSVRouter;
import org.jumpmind.symmetric.route.ChannelRouterContext;
import org.jumpmind.symmetric.route.ColumnMatchDataRouter;
import org.jumpmind.symmetric.route.CommonBatchCollisionException;
import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter;
import org.jumpmind.symmetric.route.ConvertToReloadRouter;
import org.jumpmind.symmetric.route.DBFRouter;
Expand Down Expand Up @@ -552,6 +553,10 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
long dataCountWithBigLob = routeDataForChannel(processInfo, nodeChannel, sourceNode, true, batchesByNodes, batchesByGroups);
return context.getCommittedDataEventCount() + dataCountWithBigLob;
}
} catch (CommonBatchCollisionException e) {
gapDetector.setIsAllDataRead(false);
dataCount = 1; // we prevented writing the collision, so commit what we have
throw e;
} catch (Throwable ex) {
log.error(
String.format("Failed to route and batch data on '%s' channel",
Expand Down Expand Up @@ -924,6 +929,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
long loadId = -1;
boolean dataEventAdded = false;
boolean useCommonMode = context.isProduceCommonBatches() || context.isProduceGroupBatches();
boolean firstTimeForGroup = false;
int numberOfDataEventsInserted = 0;

if (nodeIds == null || nodeIds.size() == 0) {
Expand All @@ -938,6 +944,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
if (batches == null) {
batches = new HashMap<String, OutgoingBatch>();
batchesByGroups.put(groupKey, batches);
firstTimeForGroup = true;
}
useCommonMode = nodeIds.size() > 1;
} else {
Expand Down Expand Up @@ -977,6 +984,10 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
log.debug("About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.",
nodeId, batch.getChannelId(), batches.values());
}

if (useCommonMode && !firstTimeForGroup) {
throw new CommonBatchCollisionException("Collision detected for common batch group");
}

engine.getOutgoingBatchService().insertOutgoingBatch(batch);
processInfo.incrementBatchCount();
Expand Down

0 comments on commit 90d69d2

Please sign in to comment.