Skip to content

Commit

Permalink
IGNITE-9084 Fixed error handling for historical rebalance - Fixes #4437.
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
  • Loading branch information
Jokser authored and agoncharuk committed Sep 6, 2018
1 parent d5432c0 commit a232b88
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
Expand Down Expand Up @@ -889,6 +891,11 @@ public GridIoMessageFactory(MessageFactory[] ext) {

break;

case 120:
msg = new GridDhtPartitionSupplyMessageV2();

break;

case 124:
msg = new GridMessageCollection<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,8 @@ private void processFailedMessage(UUID nodeId,

break;

case 114: {
case 114:
case 120: {
processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,10 +688,20 @@ public void handleSupplyMessage(
if (log.isDebugEnabled())
log.debug("Received supply message [grp=" + grp.cacheOrGroupName() + ", msg=" + supply + ']');

// Check whether there were class loading errors on unmarshal
// Check whether there were error during supply message unmarshalling process.
if (supply.classError() != null) {
U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId +
"]. Class got undeployed during preloading: " + supply.classError());
"]. Supply message couldn't be unmarshalled: " + supply.classError());

fut.cancel(nodeId);

return;
}

// Check whether there were error during supplying process.
if (supply.error() != null) {
U.warn(log, "Rebalancing from node cancelled [grp=" + grp.cacheOrGroupName() + ", node=" + nodeId +
"]. Supplier has failed with error: " + supply.error());

fut.cancel(nodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,11 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
if (node == null)
return;

try {
SupplyContext sctx;
IgniteRebalanceIterator iter = null;

SupplyContext sctx = null;

try {
synchronized (scMap) {
sctx = scMap.remove(contextId);

Expand All @@ -229,7 +231,7 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
scMap.put(contextId, sctx);

if (log.isDebugEnabled())
log.debug("Stale demand message [grp=" + grp.cacheOrGroupName()
log.debug("Stale demand message [cache=" + grp.cacheOrGroupName()
+ ", actualContext=" + sctx
+ ", from=" + nodeId
+ ", demandMsg=" + d + "]");
Expand All @@ -241,7 +243,7 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
// Demand request should not contain empty partitions if no supply context is associated with it.
if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) {
if (log.isDebugEnabled())
log.debug("Empty demand message [grp=" + grp.cacheOrGroupName()
log.debug("Empty demand message [cache=" + grp.cacheOrGroupName()
+ ", from=" + nodeId
+ ", topicId=" + topicId
+ ", demandMsg=" + d + "]");
Expand Down Expand Up @@ -272,8 +274,6 @@ public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemand
d.topologyVersion(),
grp.deploymentEnabled());

IgniteRebalanceIterator iter;

Set<Integer> remainingParts;

if (sctx == null || sctx.iterator == null) {
Expand Down Expand Up @@ -452,13 +452,56 @@ else if (iter.isPartitionMissing(p)) {
", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() +
", topicId=" + topicId + "]");
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send partition supply message to node: " + nodeId, e);
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() +
", msg=" + e.getMessage() + ']');
catch (Throwable t) {
if (grp.shared().kernalContext().isStopping())
return;

// Sending supply messages with error requires new protocol.
boolean sendErrMsg = node.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0;

if (t instanceof IgniteSpiException) {
if (log.isDebugEnabled())
log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() +
", msg=" + t.getMessage() + ']');

sendErrMsg = false;
}
else
U.error(log, "Failed to continue supplying process for " +
"[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+ ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t);

try {
if (sctx != null)
clearContext(sctx, log);
else if (iter != null)
iter.close();
}
catch (Throwable t1) {
U.error(log, "Failed to cleanup supplying context " +
"[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+ ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1);
}

if (!sendErrMsg)
return;

try {
GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2(
d.rebalanceId(),
grp.groupId(),
d.topologyVersion(),
grp.deploymentEnabled(),
t
);

reply(node, d, errMsg, contextId);
}
catch (Throwable t1) {
U.error(log, "Failed to send supply error message for " +
"[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId
+ ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
* @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
GridDhtPartitionSupplyMessage(long rebalanceId,
GridDhtPartitionSupplyMessage(
long rebalanceId,
int grpId,
AffinityTopologyVersion topVer,
boolean addDepInfo) {
boolean addDepInfo
) {
this.grpId = grpId;
this.rebalanceId = rebalanceId;
this.topVer = topVer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;

/**
* Supply message with supplier error transfer support.
*/
public class GridDhtPartitionSupplyMessageV2 extends GridDhtPartitionSupplyMessage {
/** */
private static final long serialVersionUID = 0L;

/** Available since. */
public static final IgniteProductVersion AVAILABLE_SINCE = IgniteProductVersion.fromString("2.7.0");

/** Supplying process error. */
@GridDirectTransient
private Throwable err;

/** Supplying process error bytes. */
private byte[] errBytes;

/**
* Default constructor.
*/
public GridDhtPartitionSupplyMessageV2() {
}

/**
* @param rebalanceId Rebalance id.
* @param grpId Group id.
* @param topVer Topology version.
* @param addDepInfo Add dep info.
* @param err Supply process error.
*/
public GridDhtPartitionSupplyMessageV2(
long rebalanceId,
int grpId,
AffinityTopologyVersion topVer,
boolean addDepInfo,
Throwable err
) {
super(rebalanceId, grpId, topVer, addDepInfo);

this.err = err;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);

if (err != null && errBytes == null)
errBytes = U.marshal(ctx, err);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);

if (errBytes != null && err == null)
err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);

if (!super.writeTo(buf, writer))
return false;

if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;

writer.onHeaderWritten();
}

switch (writer.state()) {
case 12:
if (!writer.writeByteArray("errBytes", errBytes))
return false;

writer.incrementState();

}

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);

if (!reader.beforeMessageRead())
return false;

if (!super.readFrom(buf, reader))
return false;

switch (reader.state()) {
case 12:
errBytes = reader.readByteArray("errBytes");

if (!reader.isLastRead())
return false;

reader.incrementState();

}

return reader.afterMessageRead(GridDhtPartitionSupplyMessageV2.class);
}

/** {@inheritDoc} */
@Nullable @Override public Throwable error() {
return err;
}

/** {@inheritDoc} */
@Override public short directType() {
return 120;
}

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 13;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,10 @@ private static class WALHistoricalIterator implements IgniteHistoricalIterator {
/** Flag indicates that partition belongs to current {@link #next} is finished and no longer needs to rebalance. */
private boolean reachedPartitionEnd;

/** Flag indicates that update counters for requested partitions have been reached and done.
* It means that no further iteration is needed. */
private boolean doneAllPartitions;

/**
* @param grp Cache context.
* @param walIt WAL iterator.
Expand Down Expand Up @@ -953,6 +957,9 @@ private WALHistoricalIterator(CacheGroupContext grp, CachePartitionPartialCounte
doneParts.add(next.partitionId());

reachedPartitionEnd = false;

if (doneParts.size() == partMap.size())
doneAllPartitions = true;
}

advance();
Expand Down Expand Up @@ -1011,6 +1018,9 @@ private void releasePartitions() {
private void advance() {
next = null;

if (doneAllPartitions)
return;

while (true) {
if (entryIt != null) {
while (entryIt.hasNext()) {
Expand Down
Loading

0 comments on commit a232b88

Please sign in to comment.