Skip to content

Commit

Permalink
ignite-4705 Atomic cache protocol change: notify client node from bac…
Browse files Browse the repository at this point in the history
…kups
  • Loading branch information
sboikov committed Mar 13, 2017
1 parent f59f46d commit cbc472f
Show file tree
Hide file tree
Showing 98 changed files with 5,462 additions and 3,597 deletions.
Expand Up @@ -19,11 +19,13 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


/** /**
Expand Down Expand Up @@ -112,7 +114,8 @@ public MockNioSession(InetSocketAddress locAddr, InetSocketAddress rmtAddr) {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void sendNoFuture(Object msg) throws IgniteCheckedException { @Override public void sendNoFuture(Object msg, @Nullable IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
// No-op. // No-op.
} }


Expand Down
Expand Up @@ -1703,8 +1703,9 @@ private void start0(GridStartContext startCtx) throws IgniteCheckedException {


sysExecSvc.allowCoreThreadTimeOut(true); sysExecSvc.allowCoreThreadTimeOut(true);


if (cfg.getStripedPoolSize() > 0) validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool");
stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);
stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log);


// Note that since we use 'LinkedBlockingQueue', number of // Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect. // maximum threads has no effect.
Expand Down
Expand Up @@ -17,6 +17,17 @@


package org.apache.ignite.internal.binary; package org.apache.ignite.internal.binary;


import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryObjectException;
Expand All @@ -33,19 +44,7 @@
import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


import java.io.Externalizable; import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import java.util.UUID;

import static java.nio.charset.StandardCharsets.*;


/** /**
* Binary object implementation. * Binary object implementation.
Expand Down Expand Up @@ -74,7 +73,6 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
private boolean detachAllowed; private boolean detachAllowed;


/** */ /** */
@GridDirectTransient
private int part = -1; private int part = -1;


/** /**
Expand Down Expand Up @@ -561,7 +559,6 @@ else if (fieldOffLen == BinaryUtils.OFFSET_2)


start = in.readInt(); start = in.readInt();
} }

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf); writer.setBuffer(buf);
Expand All @@ -584,6 +581,12 @@ else if (fieldOffLen == BinaryUtils.OFFSET_2)
writer.incrementState(); writer.incrementState();


case 1: case 1:
if (!writer.writeInt("part", part))
return false;

writer.incrementState();

case 2:
if (!writer.writeInt("start", detachAllowed ? 0 : start)) if (!writer.writeInt("start", detachAllowed ? 0 : start))
return false; return false;


Expand Down Expand Up @@ -611,6 +614,14 @@ else if (fieldOffLen == BinaryUtils.OFFSET_2)
reader.incrementState(); reader.incrementState();


case 1: case 1:
part = reader.readInt("part");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 2:
start = reader.readInt("start"); start = reader.readInt("start");


if (!reader.isLastRead()) if (!reader.isLastRead())
Expand All @@ -620,7 +631,7 @@ else if (fieldOffLen == BinaryUtils.OFFSET_2)


} }


return true; return reader.afterMessageRead(BinaryObjectImpl.class);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down
Expand Up @@ -229,7 +229,7 @@ public class GridClientNioTcpConnection extends GridClientConnection {
GridNioFuture<?> sslHandshakeFut = null; GridNioFuture<?> sslHandshakeFut = null;


if (sslCtx != null) { if (sslCtx != null) {
sslHandshakeFut = new GridNioFutureImpl<>(); sslHandshakeFut = new GridNioFutureImpl<>(null);


meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
} }
Expand Down
Expand Up @@ -818,10 +818,7 @@ private void processRegularMessage(
return; return;
} }


if (ctx.config().getStripedPoolSize() > 0 && if (plc == GridIoPolicy.SYSTEM_POOL && msg.partition() != Integer.MIN_VALUE) {
plc == GridIoPolicy.SYSTEM_POOL &&
msg.partition() != Integer.MIN_VALUE
) {
ctx.getStripedExecutorService().execute(msg.partition(), c); ctx.getStripedExecutorService().execute(msg.partition(), c);


return; return;
Expand Down
Expand Up @@ -67,14 +67,17 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicCheckUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
Expand Down Expand Up @@ -118,11 +121,11 @@
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse; import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage; import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage; import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey; import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage; import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
Expand Down Expand Up @@ -173,6 +176,21 @@ public GridIoMessageFactory(MessageFactory[] ext) {
Message msg = null; Message msg = null;


switch (type) { switch (type) {
case -47:
msg = new GridNearAtomicCheckUpdateRequest();

break;

case -46:
msg = new UpdateErrors();

break;

case -45:
msg = new GridDhtAtomicNearResponse();

break;

case -44: case -44:
msg = new TcpCommunicationSpi.HandshakeMessage2(); msg = new TcpCommunicationSpi.HandshakeMessage2();


Expand Down
Expand Up @@ -33,7 +33,8 @@
/** /**
* *
*/ */
@SuppressWarnings("TypeMayBeWeakened") public class CacheObjectContext { @SuppressWarnings("TypeMayBeWeakened")
public class CacheObjectContext {
/** */ /** */
private GridKernalContext kernalCtx; private GridKernalContext kernalCtx;


Expand Down
Expand Up @@ -19,16 +19,15 @@


import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;


/** /**
* Update future for atomic cache. * Update future for atomic cache.
*/ */
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/** /**
* @return Future version. * @return Future ID.
*/ */
public GridCacheVersion version(); public Long id();


/** /**
* Gets future that will be completed when it is safe when update is finished on the given version of topology. * Gets future that will be completed when it is safe when update is finished on the given version of topology.
Expand Down

0 comments on commit cbc472f

Please sign in to comment.