Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
*
* @deprecated Use {@link DurableBackgroundCleanupIndexTreeTaskV2}.
*/
@Deprecated
public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundTask {
/** */
private static final long serialVersionUID = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ protected AbstractTransmission(
int chunkSize
) {
A.notNull(meta, "Initial file meta cannot be null");
A.notNullOrEmpty(meta.name(), "Trasmisson name cannot be empty or null");
A.notNullOrEmpty(meta.name(), "Transmission name cannot be empty or null");
A.ensure(meta.offset() >= 0, "File start position cannot be negative");
A.ensure(meta.count() > 0, "Total number of bytes to transfer must be greater than zero");
A.ensure(meta.count() >= 0, "Total number of bytes to transfer can't be less than zero");
A.notNull(stopChecker, "Process stop checker cannot be null");
A.ensure(chunkSize > 0, "Size of chunks to transfer data must be positive");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public FileReceiver(
fileIo.position(meta.offset());
}
catch (IOException e) {
throw new IgniteException("Unable to open destination file. Receiver will will be stopped", e);
throw new IgniteException("Unable to open destination file. Receiver will be stopped: " +
file.getAbsolutePath(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ private static void format(StringBuilder b, Collection<IgnitePair<Long>> pairs)
if (nodeId.equals(e.getValue().rmtNodeId)) {
it.remove();

interruptRecevier(e.getValue(),
interruptReceiver(e.getValue(),
new ClusterTopologyCheckedException("Remote node left the grid. " +
"Receiver has been stopped : " + nodeId));
}
Expand Down Expand Up @@ -1179,7 +1179,7 @@ private TcpCommunicationSpi getTcpCommunicationSpi() {
}

for (ReceiverContext rctx : rcvs) {
interruptRecevier(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
interruptReceiver(rctx, new NodeStoppingException("Local node io manager requested to be stopped: "
+ ctx.localNodeId()));
}
}
Expand Down Expand Up @@ -1965,7 +1965,7 @@ public void removeTransmissionHandler(Object topic) {
rcvCtx0 = rcvCtxs.remove(topic);
}

interruptRecevier(rcvCtx0,
interruptReceiver(rcvCtx0,
new IgniteCheckedException("Receiver has been closed due to removing corresponding transmission handler " +
"on local node [nodeId=" + ctx.localNodeId() + ']'));
}
Expand Down Expand Up @@ -2787,7 +2787,7 @@ public int getOutboundMessagesQueueSize() {
* @param rctx Receiver context to use.
* @param ex Exception to close receiver with.
*/
private void interruptRecevier(ReceiverContext rctx, Exception ex) {
private void interruptReceiver(ReceiverContext rctx, Exception ex) {
if (rctx == null)
return;

Expand All @@ -2800,9 +2800,14 @@ private void interruptRecevier(ReceiverContext rctx, Exception ex) {
rctx.lastState = rctx.lastState == null ?
new TransmissionMeta(ex) : rctx.lastState.error(ex);

rctx.hnd.onException(rctx.rmtNodeId, ex);
if (X.hasCause(ex, TransmissionCancelledException.class)) {
if (log.isInfoEnabled())
log.info("Transmission receiver has been cancelled [rctx=" + rctx + ']');
}
else
U.error(log, "Receiver has been interrupted due to an exception occurred [rctx=" + rctx + ']', ex);

U.error(log, "Receiver has been interrupted due to an exception occurred [ctx=" + rctx + ']', ex);
rctx.hnd.onException(rctx.rmtNodeId, ex);
}
}

Expand Down Expand Up @@ -2856,7 +2861,8 @@ private void processOpenedChannel(Object topic, UUID rmtNodeId, SessionChannelMe
"It's not allowed to process different sessions over the same topic simultaneously. " +
"Channel will be closed [initMsg=" + initMsg + ", channel=" + ch + ", nodeId=" + rmtNodeId + ']');

U.error(log, err);
U.error(log, "Error has been sent back to remote node. Receiver holds the local topic " +
"[topic=" + topic + ", rmtNodeId=" + rmtNodeId + ", ctx=" + rcvCtx + ']', err);

out.writeObject(new TransmissionMeta(err));

Expand All @@ -2881,17 +2887,15 @@ private void processOpenedChannel(Object topic, UUID rmtNodeId, SessionChannelMe
if (rcvCtx.lastState == null || rcvCtx.lastState.error() == null)
receiveFromChannel(topic, rcvCtx, in, out, ch);
else
interruptRecevier(rcvCtxs.remove(topic), rcvCtx.lastState.error());
interruptReceiver(rcvCtxs.remove(topic), rcvCtx.lastState.error());
}
finally {
rcvCtx.lock.unlock();
}
}
catch (Throwable t) {
U.error(log, "Download session cannot be finished due to an unexpected error [ctx=" + rcvCtx + ']', t);

// Do not remove receiver context here, since sender will recconect to get this error.
interruptRecevier(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
interruptReceiver(rcvCtx, new IgniteCheckedException("Channel processing error [nodeId=" + rmtNodeId + ']', t));
}
finally {
U.closeQuiet(in);
Expand Down Expand Up @@ -2991,7 +2995,7 @@ private void receiveFromChannel(
}

@Override public void onTimeout() {
interruptRecevier(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
interruptReceiver(rcvCtxs.remove(topic), new IgniteCheckedException("Receiver is closed due to " +
"waiting for the reconnect has been timeouted"));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesFailureMessage;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotFilesRequestMessage;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
Expand Down Expand Up @@ -371,6 +373,8 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new);
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
factory.register((short)177, TcpInverseConnectionResponseMessage::new);
factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new);
factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new);

// [-3..119] [124..129] [-23..-28] [-36..-55] - this
// [120..123] - DR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ public interface TransmissionHandler {
*
* @param nodeId Remote node id from which request has been received.
* @param initMeta Initial handler meta info.
* @return Intance of read handler to process incoming data like the {@link FileChannel} manner.
* @return Instance of read handler to process incoming data like the {@link FileChannel} manner.
*/
public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta);

/**
* The {@link TransmissionCancelledException} will be received by exception handler if the local transmission
* ends by the user interruption request.
*
* @param nodeId Remote node id on which the error occurred.
* @param err The err of fail handling process.
* @param err The error of fail handling process.
*/
public void onException(UUID nodeId, Throwable err);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3602,12 +3602,7 @@ private void fillAffinityNodeCaches(
if (grpAff.persistentCacheGrp && bltNodes != null && !bltNodes.contains(node.id())) // Filter out.
continue;

List<ClusterNode> nodes = cacheGrpAffNodes.get(grpId);

if (nodes == null)
cacheGrpAffNodes.put(grpId, nodes = new ArrayList<>());

nodes.add(node);
cacheGrpAffNodes.computeIfAbsent(grpId, id -> new ArrayList<>()).add(node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
Expand All @@ -59,6 +60,7 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PART_DISTRIBUTION_WARN_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.getFloat;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;

Expand Down Expand Up @@ -163,7 +165,7 @@ public class GridAffinityAssignmentCache {
* @param backups Number of backups.
* @param locCache Local cache flag.
*/
public GridAffinityAssignmentCache(GridKernalContext ctx,
private GridAffinityAssignmentCache(GridKernalContext ctx,
String cacheOrGrpName,
int grpId,
AffinityFunction aff,
Expand Down Expand Up @@ -195,6 +197,22 @@ public GridAffinityAssignmentCache(GridKernalContext ctx,
assert similarAffKey != null;
}

/**
* @param ctx Kernal context.
* @param aff Initialized affinity function.
* @param ccfg Cache configuration.
* @return Affinity assignment cache instance.
*/
public static GridAffinityAssignmentCache create(GridKernalContext ctx, AffinityFunction aff, CacheConfiguration<?, ?> ccfg) {
return new GridAffinityAssignmentCache(ctx,
CU.cacheOrGroupName(ccfg),
CU.cacheGroupId(ccfg),
aff,
ccfg.getNodeFilter(),
ccfg.getBackups(),
ccfg.getCacheMode() == LOCAL);
}

/**
* @return Key to find caches with similar affinity.
*/
Expand Down Expand Up @@ -338,7 +356,7 @@ public IdealAffinityAssignment calculate(
if (!locCache) {
sorted = new ArrayList<>(discoCache.cacheGroupAffinityNodes(groupId()));

Collections.sort(sorted, NodeOrderComparator.getInstance());
sorted.sort(NodeOrderComparator.getInstance());
}
else
sorted = Collections.singletonList(ctx.discovery().localNode());
Expand Down
Loading