Skip to content

Commit

Permalink
#ignite-51: IgniteTxEntry implements Message.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Mar 3, 2015
1 parent 99b215a commit 4ba6e20
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 131 deletions.
Expand Up @@ -532,6 +532,14 @@ public GridIoMessageFactory(MessageFactory[] ext) {
case 96:
msg = new CacheContinuousQueryEntry();

break;
case 97:
msg = new IgniteTxEntry();

break;
case 98:
msg = new IgniteTxEntry.TxEntryValueHolder();

break;

default:
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.transactions.*;
Expand Down Expand Up @@ -70,19 +69,11 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridDirectTransient
private Collection<IgniteTxEntry> reads;

/** */
@GridDirectCollection(byte[].class)
private Collection<byte[]> readsBytes;

/** Transaction write entries. */
@GridToStringInclude
@GridDirectTransient
private Collection<IgniteTxEntry> writes;

/** */
@GridDirectCollection(byte[].class)
private Collection<byte[]> writesBytes;

/** DHT versions to verify. */
@GridToStringInclude
@GridDirectTransient
Expand Down Expand Up @@ -301,24 +292,12 @@ public boolean onePhaseCommit() {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);

if (writes != null) {
if (writes != null)
marshalTx(writes, ctx);

writesBytes = new ArrayList<>(writes.size());

for (IgniteTxEntry e : writes)
writesBytes.add(ctx.marshaller().marshal(e));
}

if (reads != null) {
if (reads != null)
marshalTx(reads, ctx);

readsBytes = new ArrayList<>(reads.size());

for (IgniteTxEntry e : reads)
readsBytes.add(ctx.marshaller().marshal(e));
}

if (grpLockKey != null && grpLockKeyBytes == null)
grpLockKeyBytes = ctx.marshaller().marshal(grpLockKey);

Expand All @@ -341,23 +320,9 @@ public boolean onePhaseCommit() {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);

if (writesBytes != null) {
writes = new ArrayList<>(writesBytes.size());

for (byte[] arr : writesBytes)
writes.add(ctx.marshaller().<IgniteTxEntry>unmarshal(arr, ldr));

unmarshalTx(writes, false, ctx, ldr);
}

if (readsBytes != null) {
reads = new ArrayList<>(readsBytes.size());
unmarshalTx(writes, false, ctx, ldr);

for (byte[] arr : readsBytes)
reads.add(ctx.marshaller().<IgniteTxEntry>unmarshal(arr, ldr));

unmarshalTx(reads, false, ctx, ldr);
}
unmarshalTx(reads, false, ctx, ldr);

if (grpLockKeyBytes != null && grpLockKey == null)
grpLockKey = ctx.marshaller().unmarshal(grpLockKeyBytes, ldr);
Expand All @@ -384,64 +349,6 @@ public boolean onePhaseCommit() {
txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
}

/**
*
* @param out Output.
* @param col Set to write.
* @throws IOException If write failed.
*/
private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry> col) throws IOException {
boolean empty = F.isEmpty(col);

if (!empty) {
out.writeInt(col.size());

for (IgniteTxEntry e : col) {
CacheObject val = e.value();
boolean hasWriteVal = e.hasWriteValue();
boolean hasReadVal = e.hasReadValue();

try {
// Don't serialize value if invalidate is set to true.
if (invalidate)
e.value(null, false, false);

out.writeObject(e);
}
finally {
// Set original value back.
e.value(val, hasWriteVal, hasReadVal);
}
}
}
else
out.writeInt(-1);
}

/**
* @param in Input.
* @return Deserialized set.
* @throws IOException If deserialization failed.
* @throws ClassNotFoundException If deserialized class could not be found.
*/
@SuppressWarnings({"unchecked"})
@Nullable private Collection<IgniteTxEntry> readCollection(ObjectInput in) throws IOException,
ClassNotFoundException {
List<IgniteTxEntry> col = null;

int size = in.readInt();

// Check null flag.
if (size != -1) {
col = new ArrayList<>(size);

for (int i = 0; i < size; i++)
col.add((IgniteTxEntry)in.readObject());
}

return col == null ? Collections.<IgniteTxEntry>emptyList() : col;
}

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
Expand Down Expand Up @@ -506,7 +413,7 @@ private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry> col) th
writer.incrementState();

case 16:
if (!writer.writeCollection("readsBytes", readsBytes, MessageCollectionItemType.BYTE_ARR))
if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG))
return false;

writer.incrementState();
Expand Down Expand Up @@ -548,7 +455,7 @@ private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry> col) th
writer.incrementState();

case 23:
if (!writer.writeCollection("writesBytes", writesBytes, MessageCollectionItemType.BYTE_ARR))
if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG))
return false;

writer.incrementState();
Expand Down Expand Up @@ -642,7 +549,7 @@ private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry> col) th
reader.incrementState();

case 16:
readsBytes = reader.readCollection("readsBytes", MessageCollectionItemType.BYTE_ARR);
reads = reader.readCollection("reads", MessageCollectionItemType.MSG);

if (!reader.isLastRead())
return false;
Expand Down Expand Up @@ -698,7 +605,7 @@ private void writeCollection(ObjectOutput out, Collection<IgniteTxEntry> col) th
reader.incrementState();

case 23:
writesBytes = reader.readCollection("writesBytes", MessageCollectionItemType.BYTE_ARR);
writes = reader.readCollection("writes", MessageCollectionItemType.MSG);

if (!reader.isLastRead())
return false;
Expand Down

0 comments on commit 4ba6e20

Please sign in to comment.