Skip to content

Commit

Permalink
#ignite-51: IgniteTxEntry implements Message.
Browse files Browse the repository at this point in the history
  • Loading branch information
ivasilinets committed Mar 3, 2015
1 parent 4ba6e20 commit ac04da2
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 156 deletions.
Expand Up @@ -540,6 +540,10 @@ public GridIoMessageFactory(MessageFactory[] ext) {
case 98: case 98:
msg = new IgniteTxEntry.TxEntryValueHolder(); msg = new IgniteTxEntry.TxEntryValueHolder();


break;
case 99:
msg = new GridNearTxPrepareResponse.OwnedValue();

break; break;


default: default:
Expand Down
Expand Up @@ -61,17 +61,16 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
@GridDirectTransient @GridDirectTransient
private Collection<IgniteTxEntry> nearWrites; private Collection<IgniteTxEntry> nearWrites;


/** Serialized near writes. */
@GridDirectCollection(byte[].class)
private Collection<byte[]> nearWritesBytes;

/** Owned versions by key. */ /** Owned versions by key. */
@GridToStringInclude @GridToStringInclude
@GridDirectTransient @GridDirectTransient
private Map<IgniteTxKey, GridCacheVersion> owned; private Map<IgniteTxKey, GridCacheVersion> owned;


/** Owned versions bytes. */ /** Owned keys. */
private byte[] ownedBytes; private Collection<IgniteTxKey> ownedKeys;

/** Owned values. */
private Collection<GridCacheVersion> ownedVals;


/** Near transaction ID. */ /** Near transaction ID. */
private GridCacheVersion nearXidVer; private GridCacheVersion nearXidVer;
Expand Down Expand Up @@ -272,40 +271,49 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx); super.prepareMarshal(ctx);


if (ownedBytes == null && owned != null) { if (owned != null) {
ownedBytes = CU.marshal(ctx, owned); ownedKeys = owned.keySet();

ownedVals = owned.values();

for (IgniteTxKey key: ownedKeys)
key.prepareMarshal(ctx.cacheContext(key.cacheId()));


if (ctx.deploymentEnabled()) { if (ctx.deploymentEnabled()) {
for (IgniteTxKey k : owned.keySet()) for (IgniteTxKey k : owned.keySet())
prepareObject(k, ctx); prepareObject(k, ctx);
} }
} }


if (nearWrites != null) { if (nearWrites != null)
marshalTx(nearWrites, ctx); marshalTx(nearWrites, ctx);

nearWritesBytes = new ArrayList<>(nearWrites.size());

for (IgniteTxEntry e : nearWrites)
nearWritesBytes.add(ctx.marshaller().marshal(e));
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr); super.finishUnmarshal(ctx, ldr);


if (ownedBytes != null && owned == null) if (ownedKeys != null && owned == null) {
owned = ctx.marshaller().unmarshal(ownedBytes, ldr); owned = new HashMap<>();

assert ownedKeys.size() == ownedVals.size();

Iterator<IgniteTxKey> keyIter = ownedKeys.iterator();


if (nearWritesBytes != null) { Iterator<GridCacheVersion> valIter = ownedVals.iterator();
nearWrites = new ArrayList<>(nearWritesBytes.size());


for (byte[] arr : nearWritesBytes) while (keyIter.hasNext()) {
nearWrites.add(ctx.marshaller().<IgniteTxEntry>unmarshal(arr, ldr)); IgniteTxKey key = keyIter.next();

key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr);

owned.put(key, valIter.next());
}


unmarshalTx(nearWrites, true, ctx, ldr);
} }

unmarshalTx(nearWrites, true, ctx, ldr);

} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -359,7 +367,7 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {
writer.incrementState(); writer.incrementState();


case 29: case 29:
if (!writer.writeCollection("nearWritesBytes", nearWritesBytes, MessageCollectionItemType.BYTE_ARR)) if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false; return false;


writer.incrementState(); writer.incrementState();
Expand All @@ -371,30 +379,36 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {
writer.incrementState(); writer.incrementState();


case 31: case 31:
if (!writer.writeByteArray("ownedBytes", ownedBytes)) if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false; return false;


writer.incrementState(); writer.incrementState();


case 32: case 32:
if (!writer.writeBitSet("preloadKeys", preloadKeys)) if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false; return false;


writer.incrementState(); writer.incrementState();


case 33: case 33:
if (!writer.writeUuid("subjId", subjId)) if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false; return false;


writer.incrementState(); writer.incrementState();


case 34: case 34:
if (!writer.writeInt("taskNameHash", taskNameHash)) if (!writer.writeUuid("subjId", subjId))
return false; return false;


writer.incrementState(); writer.incrementState();


case 35: case 35:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;

writer.incrementState();

case 36:
if (!writer.writeLong("topVer", topVer)) if (!writer.writeLong("topVer", topVer))
return false; return false;


Expand Down Expand Up @@ -457,7 +471,7 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {
reader.incrementState(); reader.incrementState();


case 29: case 29:
nearWritesBytes = reader.readCollection("nearWritesBytes", MessageCollectionItemType.BYTE_ARR); nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);


if (!reader.isLastRead()) if (!reader.isLastRead())
return false; return false;
Expand All @@ -473,38 +487,46 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {
reader.incrementState(); reader.incrementState();


case 31: case 31:
ownedBytes = reader.readByteArray("ownedBytes"); ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);


if (!reader.isLastRead()) if (!reader.isLastRead())
return false; return false;


reader.incrementState(); reader.incrementState();


case 32: case 32:
preloadKeys = reader.readBitSet("preloadKeys"); ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);


if (!reader.isLastRead()) if (!reader.isLastRead())
return false; return false;


reader.incrementState(); reader.incrementState();


case 33: case 33:
subjId = reader.readUuid("subjId"); preloadKeys = reader.readBitSet("preloadKeys");


if (!reader.isLastRead()) if (!reader.isLastRead())
return false; return false;


reader.incrementState(); reader.incrementState();


case 34: case 34:
taskNameHash = reader.readInt("taskNameHash"); subjId = reader.readUuid("subjId");


if (!reader.isLastRead()) if (!reader.isLastRead())
return false; return false;


reader.incrementState(); reader.incrementState();


case 35: case 35:
taskNameHash = reader.readInt("taskNameHash");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 36:
topVer = reader.readLong("topVer"); topVer = reader.readLong("topVer");


if (!reader.isLastRead()) if (!reader.isLastRead())
Expand All @@ -524,6 +546,6 @@ public Map<IgniteTxKey, GridCacheVersion> owned() {


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public byte fieldsCount() { @Override public byte fieldsCount() {
return 36; return 37;
} }
} }
Expand Up @@ -33,7 +33,6 @@
import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.transactions.*; import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.typedef.internal.*;
Expand Down Expand Up @@ -935,7 +934,7 @@ void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
else { else {
assert F.isEmpty(res.invalidPartitions()); assert F.isEmpty(res.invalidPartitions());


for (Map.Entry<IgniteTxKey, IgniteBiTuple<GridCacheVersion, CacheObject>> entry : res.ownedValues().entrySet()) { for (Map.Entry<IgniteTxKey, GridNearTxPrepareResponse.OwnedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey()); IgniteTxEntry txEntry = tx.entry(entry.getKey());


assert txEntry != null; assert txEntry != null;
Expand All @@ -947,17 +946,17 @@ void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (cacheCtx.isNear()) { if (cacheCtx.isNear()) {
GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached();


IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue(); GridNearTxPrepareResponse.OwnedValue tup = entry.getValue();


nearEntry.resetFromPrimary(tup.get2(), tx.xidVersion(), nearEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion(),
tup.get1(), m.node().id()); tup.version(), m.node().id());
} }
else if (txEntry.cached().detached()) { else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();


IgniteBiTuple<GridCacheVersion, CacheObject> tup = entry.getValue(); GridNearTxPrepareResponse.OwnedValue tup = entry.getValue();


detachedEntry.resetFromPrimary(tup.get2(), tx.xidVersion()); detachedEntry.resetFromPrimary(tup.cacheObject(), tx.xidVersion());
} }


break; break;
Expand Down

0 comments on commit ac04da2

Please sign in to comment.