From 4ba6e20e7530d892fce3f631355e087df1c593a8 Mon Sep 17 00:00:00 2001 From: ivasilinets Date: Tue, 3 Mar 2015 15:58:57 +0300 Subject: [PATCH] #ignite-51: IgniteTxEntry implements Message. --- .../communication/GridIoMessageFactory.java | 8 + .../GridDistributedTxPrepareRequest.java | 109 +------ .../cache/transactions/IgniteTxEntry.java | 308 ++++++++++++++++-- 3 files changed, 294 insertions(+), 131 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 57b5ac403ae83..814e380206442 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -532,6 +532,14 @@ public GridIoMessageFactory(MessageFactory[] ext) { case 96: msg = new CacheContinuousQueryEntry(); + break; + case 97: + msg = new IgniteTxEntry(); + + break; + case 98: + msg = new IgniteTxEntry.TxEntryValueHolder(); + break; default: diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 5648ad46d832a..7ab40ea18e99a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.transactions.*; @@ -70,19 +69,11 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectTransient private Collection reads; - /** */ - @GridDirectCollection(byte[].class) - private Collection readsBytes; - /** Transaction write entries. */ @GridToStringInclude @GridDirectTransient private Collection writes; - /** */ - @GridDirectCollection(byte[].class) - private Collection writesBytes; - /** DHT versions to verify. */ @GridToStringInclude @GridDirectTransient @@ -301,24 +292,12 @@ public boolean onePhaseCommit() { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (writes != null) { + if (writes != null) marshalTx(writes, ctx); - writesBytes = new ArrayList<>(writes.size()); - - for (IgniteTxEntry e : writes) - writesBytes.add(ctx.marshaller().marshal(e)); - } - - if (reads != null) { + if (reads != null) marshalTx(reads, ctx); - readsBytes = new ArrayList<>(reads.size()); - - for (IgniteTxEntry e : reads) - readsBytes.add(ctx.marshaller().marshal(e)); - } - if (grpLockKey != null && grpLockKeyBytes == null) grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey); @@ -341,23 +320,9 @@ public boolean onePhaseCommit() { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (writesBytes != null) { - writes = new ArrayList<>(writesBytes.size()); - - for (byte[] arr : writesBytes) - writes.add(ctx.marshaller().unmarshal(arr, ldr)); - - unmarshalTx(writes, false, ctx, ldr); - } - - if (readsBytes != null) { - reads = new ArrayList<>(readsBytes.size()); + unmarshalTx(writes, false, ctx, ldr); - for (byte[] arr : readsBytes) - reads.add(ctx.marshaller().unmarshal(arr, ldr)); - - unmarshalTx(reads, false, ctx, ldr); - } + unmarshalTx(reads, false, ctx, ldr); if (grpLockKeyBytes != null && grpLockKey == null) grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr); @@ -384,64 +349,6 @@ public boolean onePhaseCommit() { txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); } - /** - * - * @param out Output. - * @param col Set to write. - * @throws IOException If write failed. - */ - private void writeCollection(ObjectOutput out, Collection col) throws IOException { - boolean empty = F.isEmpty(col); - - if (!empty) { - out.writeInt(col.size()); - - for (IgniteTxEntry e : col) { - CacheObject val = e.value(); - boolean hasWriteVal = e.hasWriteValue(); - boolean hasReadVal = e.hasReadValue(); - - try { - // Don't serialize value if invalidate is set to true. - if (invalidate) - e.value(null, false, false); - - out.writeObject(e); - } - finally { - // Set original value back. - e.value(val, hasWriteVal, hasReadVal); - } - } - } - else - out.writeInt(-1); - } - - /** - * @param in Input. - * @return Deserialized set. - * @throws IOException If deserialization failed. - * @throws ClassNotFoundException If deserialized class could not be found. - */ - @SuppressWarnings({"unchecked"}) - @Nullable private Collection readCollection(ObjectInput in) throws IOException, - ClassNotFoundException { - List col = null; - - int size = in.readInt(); - - // Check null flag. - if (size != -1) { - col = new ArrayList<>(size); - - for (int i = 0; i < size; i++) - col.add((IgniteTxEntry)in.readObject()); - } - - return col == null ? Collections.emptyList() : col; - } - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -506,7 +413,7 @@ private void writeCollection(ObjectOutput out, Collection col) th writer.incrementState(); case 16: - if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -548,7 +455,7 @@ private void writeCollection(ObjectOutput out, Collection col) th writer.incrementState(); case 23: - if (!writer.writeCollection("writesBytes", writesBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -642,7 +549,7 @@ private void writeCollection(ObjectOutput out, Collection col) th reader.incrementState(); case 16: - readsBytes = reader.readCollection("readsBytes", MessageCollectionItemType.BYTE_ARR); + reads = reader.readCollection("reads", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -698,7 +605,7 @@ private void writeCollection(ObjectOutput out, Collection col) th reader.incrementState(); case 23: - writesBytes = reader.readCollection("writesBytes", MessageCollectionItemType.BYTE_ARR); + writes = reader.readCollection("writes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index db166fb8fe72d..367c5869ca5eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -21,19 +21,20 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.version.*; -import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import javax.cache.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; +import java.nio.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -44,7 +45,7 @@ * {@link #equals(Object)} method, as transaction entries should use referential * equality. */ -public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, OptimizedMarshallable { +public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Message, OptimizedMarshallable { /** */ private static final long serialVersionUID = 0L; @@ -134,7 +135,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, Optim private boolean grpLock; /** Deployment enabled flag. */ - private boolean depEnabled; + private boolean depEnabled = true; /** Expiry policy. */ private ExpiryPolicy expiryPlc; @@ -755,6 +756,182 @@ public void expiry(@Nullable ExpiryPolicy expiryPlc) { return expiryPlc; } + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + case 1: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + case 2: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); + case 3: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + case 4: + if (!writer.writeMessage("conflictVer", conflictVer)) + return false; + + writer.incrementState(); + case 5: + if (!writer.writeBoolean("grpLock", grpLock)) + return false; + + writer.incrementState(); + case 6: + if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + return false; + + writer.incrementState(); + case 7: + if (!writer.writeByteArray("filterBytes", filterBytes)) + return false; + + writer.incrementState(); + case 8: + if (!(writer.writeLong("conflictExpireTime", conflictExpireTime))) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + key = reader.readMessage("key"); + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 1: + cacheId = reader.readInt("cacheId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 2: + val = reader.readMessage("val"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 3: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 4: + conflictVer = reader.readMessage("conflictVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 5: + grpLock = reader.readBoolean("grpLock"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 6: + transformClosBytes = reader.readByteArray("transformClosBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 7: + filterBytes = reader.readByteArray("filterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 8: + conflictExpireTime = reader.readLong("conflictExpireTime"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 97; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 9; + } + + /** {@inheritDoc} */ + @Override public Object ggClassId() { + return GG_CLASS_ID; + } + + /** {@inheritDoc} */ + @Override public Class deployClass() { + ClassLoader clsLdr = getClass().getClassLoader(); + + CacheObject val = value(); + + // First of all check classes that may be loaded by class loader other than application one. + return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ? + key.getClass() : val != null ? val.getClass() : getClass(); + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + return deployClass().getClassLoader(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion()); + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeBoolean(depEnabled); @@ -822,36 +999,10 @@ public void expiry(@Nullable ExpiryPolicy expiryPlc) { expiryPlc = (ExpiryPolicy)in.readObject(); } - /** {@inheritDoc} */ - @Override public Object ggClassId() { - return GG_CLASS_ID; - } - - /** {@inheritDoc} */ - @Override public Class deployClass() { - ClassLoader clsLdr = getClass().getClassLoader(); - - CacheObject val = value(); - - // First of all check classes that may be loaded by class loader other than application one. - return key != null && !clsLdr.equals(key.getClass().getClassLoader()) ? - key.getClass() : val != null ? val.getClass() : getClass(); - } - - /** {@inheritDoc} */ - @Override public ClassLoader classLoader() { - return deployClass().getClassLoader(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion()); - } - /** * Auxiliary class to hold value, value-has-been-set flag, value update operation, value bytes. */ - private static class TxEntryValueHolder { + public static class TxEntryValueHolder implements Message { /** */ @GridToStringInclude private CacheObject val; @@ -948,6 +1099,7 @@ public void marshal(GridCacheSharedContext sharedCtx, GridCacheContext sharedCtx, GridCacheContext ctx, ClassLoader ldr, boolean depEnabled) throws IgniteCheckedException { if (val != null) val.finishUnmarshal(ctx, ldr); + // TODO IGNITE-51. // if (valBytes != null && val == null && (ctx.isUnmarshalValues() || op == TRANSFORM || depEnabled)) // val = ctx.marshaller().unmarshal(valBytes, ldr); @@ -1039,5 +1192,100 @@ public void readFrom(ObjectInput in) throws IOException, ClassNotFoundException @Override public String toString() { return "[op=" + op +", val=" + val + ']'; } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeBoolean("hasWriteVal", hasWriteVal)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeBoolean("hasReadVal", hasReadVal)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("op", op.ordinal())) + return false; + + writer.incrementState(); + case 3: + if (!writer.writeMessage("cacheObject", val)) + return false; + + writer.incrementState(); + + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + hasWriteVal = reader.readBoolean("hasWriteVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + hasReadVal = reader.readBoolean("hasReadVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + case 2: + op = GridCacheOperation.fromOrdinal(reader.readInt("op")); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + val = reader.readMessage("cacheObject"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 98; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } } }