Skip to content

Commit

Permalink
ignite-1758 Fixed issues with client reconnect handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Oct 30, 2015
1 parent 04964b9 commit 6ea3b56
Show file tree
Hide file tree
Showing 18 changed files with 1,021 additions and 253 deletions.
Expand Up @@ -355,6 +355,9 @@ public final class IgniteSystemProperties {
/** Maximum size for affinity assignment history. */
public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";

/** Maximum size for discovery messages history. */
public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";

/** Number of cache operation retries in case of topology exceptions. */
public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";

Expand Down
Expand Up @@ -165,6 +165,7 @@
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.IgniteSpi;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
Expand Down Expand Up @@ -3158,10 +3159,17 @@ private boolean skipDaemon(GridComponent comp) {

/** {@inheritDoc} */
public void dumpDebugInfo() {
U.warn(log, "Dumping debug info for node [id=" + ctx.localNodeId() +
boolean client = ctx.clientNode();

ClusterNode locNode = ctx.discovery().localNode();

UUID routerId = locNode instanceof TcpDiscoveryNode ? ((TcpDiscoveryNode)locNode).clientRouterNodeId() : null;

U.warn(log, "Dumping debug info for node [id=" + locNode.id() +
", name=" + ctx.gridName() +
", order=" + ctx.discovery().localNode().order() +
", client=" + ctx.clientNode() + ']');
", order=" + locNode.order() +
", client=" + client +
(client && routerId != null ? ", routerId=" + routerId : "") + ']');

ctx.cache().context().exchange().dumpDebugInfo();
}
Expand Down
Expand Up @@ -1803,61 +1803,80 @@ private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,

/** {@inheritDoc} */
@Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;

// Collect dynamically started caches to a single object.
Collection<DynamicCacheChangeRequest> reqs =
new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
Collection<DynamicCacheChangeRequest> reqs;

boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null;
Map<String, Map<UUID, Boolean>> clientNodesMap;

Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches;
if (reconnect) {
reqs = new ArrayList<>(caches.size());

for (DynamicCacheDescriptor desc : descs.values()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
clientNodesMap = U.newHashMap(caches.size());

req.startCacheConfiguration(desc.cacheConfiguration());
for (GridCacheAdapter<?, ?> cache : caches.values()) {
DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));

req.cacheType(desc.cacheType());
if (desc == null)
continue;

req.deploymentId(desc.deploymentId());
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);

reqs.add(req);
}
req.startCacheConfiguration(desc.cacheConfiguration());

for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
req.cacheType(desc.cacheType());

req.startCacheConfiguration(desc.cacheConfiguration());
req.deploymentId(desc.deploymentId());

req.template(true);
reqs.add(req);

req.deploymentId(desc.deploymentId());
Boolean nearEnabled = cache.isNear();

Map<UUID, Boolean> map = U.newHashMap(1);

map.put(nodeId, nearEnabled);

reqs.add(req);
clientNodesMap.put(cache.name(), map);
}
}
else {
reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());

DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs);
for (DynamicCacheDescriptor desc : registeredCaches.values()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);

Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap();
req.startCacheConfiguration(desc.cacheConfiguration());

if (reconnect) {
clientNodesMap = U.newHashMap(caches.size());
req.cacheType(desc.cacheType());

for (GridCacheAdapter<?, ?> cache : caches.values()) {
Boolean nearEnabled = cache.isNear();
req.deploymentId(desc.deploymentId());

Map<UUID, Boolean> map = U.newHashMap(1);
reqs.add(req);
}

map.put(nodeId, nearEnabled);
for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);

clientNodesMap.put(cache.name(), map);
req.startCacheConfiguration(desc.cacheConfiguration());

req.template(true);

req.deploymentId(desc.deploymentId());

reqs.add(req);
}

clientNodesMap = ctx.discovery().clientNodesMap();
}

req.clientNodes(clientNodesMap);
DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);

batch.clientNodes(clientNodesMap);

req.clientReconnect(reconnect);
batch.clientReconnect(reconnect);

return req;
return batch;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -192,9 +192,7 @@ public GridDhtPreloader(GridCacheContext<?, ?> cctx) {

ClusterNode loc = cctx.localNode();

long startTime = loc.metrics().getStartTime();

assert startTime > 0;
assert loc.metrics().getStartTime() > 0;

final long startTopVer = loc.order();

Expand Down
Expand Up @@ -39,6 +39,8 @@
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.portable.api.IgnitePortables;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -371,6 +374,12 @@ public CacheObjectPortableProcessorImpl(GridKernalContext ctx) {
else
throw e;
}
catch (CacheException e) {
if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
continue;
else
throw e;
}

break;
}
Expand Down
Expand Up @@ -193,14 +193,19 @@ public void ackReceived(long rcvCnt) {

/**
* Node left callback.
*
* @return {@code False} if descriptor is reserved.
*/
public void onNodeLeft() {
public boolean onNodeLeft() {
GridNioFuture<?>[] futs = null;

synchronized (this) {
nodeLeft = true;

if (!reserved && !msgFuts.isEmpty()) {
if (reserved)
return false;

if (!msgFuts.isEmpty()) {
futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);

msgFuts.clear();
Expand All @@ -209,6 +214,8 @@ public void onNodeLeft() {

if (futs != null)
completeOnNodeLeft(futs);

return true;
}

/**
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
Expand Down Expand Up @@ -1358,7 +1359,9 @@ public CommunicationListener getListener() {

/** {@inheritDoc} */
@Override public int getOutboundMessagesQueueSize() {
return nioSrvr.outboundMessagesQueueSize();
GridNioServer<Message> srv = nioSrvr;

return srv != null ? srv.outboundMessagesQueueSize() : 0;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1870,39 +1873,39 @@ private void checkAttributePresence(ClusterNode node, String attrName) {
*
* @param node Destination node.
* @param msg Message to send.
* @param ackClosure Ack closure.
* @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
sendMessage0(node, msg, ackClosure);
sendMessage0(node, msg, ackC);
}

/**
* @param node Destination node.
* @param msg Message to send.
* @param ackClosure Ack closure.
* @param ackC Ack closure.
* @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message.
* Note that this is not guaranteed that failed communication will result
* in thrown exception as this is dependant on SPI implementation.
*/
private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
assert node != null;
assert msg != null;

if (log.isTraceEnabled())
log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']');

ClusterNode localNode = getLocalNode();
ClusterNode locNode = getLocalNode();

if (localNode == null)
if (locNode == null)
throw new IgniteSpiException("Local node has not been started or fully initialized " +
"[isStopping=" + getSpiContext().isStopping() + ']');

if (node.id().equals(localNode.id()))
if (node.id().equals(locNode.id()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
Expand All @@ -1915,10 +1918,10 @@ private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteE

UUID nodeId = null;

if (!client.async() && !localNode.version().equals(node.version()))
if (!client.async() && !locNode.version().equals(node.version()))
nodeId = node.id();

retry = client.sendMessage(nodeId, msg, ackClosure);
retry = client.sendMessage(nodeId, msg, ackC);

client.release();

Expand Down Expand Up @@ -2292,6 +2295,15 @@ protected GridCommunicationClient createTcpClient(ClusterNode node) throws Ignit
return null;
}

if (getSpiContext().node(node.id()) == null) {
recoveryDesc.release();

U.closeQuiet(ch);

throw new ClusterTopologyCheckedException("Failed to send message, " +
"node left cluster: " + node);
}

long rcvCnt = -1;

SSLEngine sslEngine = null;
Expand Down Expand Up @@ -3100,10 +3112,10 @@ private void cleanupRecovery() {
assert !left.isEmpty();

for (ClientKey id : left) {
GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);

if (recoverySnd != null)
recoverySnd.onNodeLeft();
if (recoverySnd != null && recoverySnd.onNodeLeft())
recoveryDescs.remove(id);
}
}
}
Expand Down

0 comments on commit 6ea3b56

Please sign in to comment.