Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Mar 4, 2015
1 parent bf2b5ac commit 4c7f456
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 250 deletions.
Expand Up @@ -57,6 +57,7 @@ protected boolean needCopy(GridCacheContext ctx) {
assert valBytes != null;

out.writeBoolean(byteArray());

U.writeByteArray(out, valBytes);
}

Expand Down
Expand Up @@ -59,15 +59,15 @@ public CacheObjectImpl(Object val, byte[] valBytes) {
if (byteArray())
return (T)Arrays.copyOf(bytes, bytes.length);
else
return ctx.marshaller().unmarshal(valBytes, U.gridClassLoader());
return (T)ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, U.gridClassLoader());
}

if (val != null)
return (T)val;

assert valBytes != null;

val = ctx.marshaller().unmarshal(valBytes, U.gridClassLoader());
val = ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshal object.", e);
Expand All @@ -87,23 +87,23 @@ public CacheObjectImpl(Object val, byte[] valBytes) {
return (byte[])val;

if (valBytes == null)
valBytes = CU.marshal(ctx.shared(), val);
valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val);

return valBytes;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException {
if (valBytes == null && !byteArray())
valBytes = CU.marshal(ctx.kernalContext().cache().context(), val);
valBytes = ctx.kernalContext().portable().marshal(ctx, val);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
assert val != null || valBytes != null;

if (val == null && ctx.isUnmarshalValues())
val = ctx.marshaller().unmarshal(valBytes, ldr);
val = ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, ldr);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -1780,6 +1780,7 @@ public CacheObjectContext cacheObjectContext() {

/**
* @param obj Object.
* @param bytes Optional value bytes.
* @return Cache object.
*/
@Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) {
Expand All @@ -1791,7 +1792,7 @@ public CacheObjectContext cacheObjectContext() {
* @return Cache object.
*/
public KeyCacheObject toCacheKeyObject(Object obj) {
return portable().toCacheKeyObject(cacheObjCtx, obj);
return portable().toCacheKeyObject(cacheObjCtx, obj, null);
}

/**
Expand All @@ -1803,17 +1804,16 @@ public KeyCacheObject toCacheKeyObject(Object obj) {
*/
public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transferOnly)
throws IgniteCheckedException {
// TODO IGNITE-51 move to processor.
assert obj != null || bytes != null;

if (obj == null) {
if (transferOnly)
return new KeyCacheObjectTransferImpl(bytes);

obj = marshaller().unmarshal(bytes, deploy().globalLoader());
obj = ctx.portable().unmarshal(cacheObjCtx, bytes, deploy().globalLoader());
}

return new KeyCacheObjectImpl(obj, bytes);
return ctx.portable().toCacheKeyObject(cacheObjCtx, obj, bytes);
}

/**
Expand All @@ -1826,7 +1826,6 @@ public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transfe
@Nullable public CacheObject unswapCacheObject(byte[] bytes, boolean valIsByteArr, @Nullable IgniteUuid clsLdrId)
throws IgniteCheckedException
{
// TODO IGNITE-51 move to processor.
if (valIsByteArr)
return new CacheObjectImpl(bytes, null);

Expand All @@ -1835,12 +1834,9 @@ public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transfe
if (ldr == null)
return null;

return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes);
return new CacheObjectImpl(portable().unmarshal(cacheObjCtx, bytes, ldr), bytes);
}

/** */
private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe();

/**
* @param valPtr Value pointer.
* @param tmp If {@code true} can return temporary instance which is valid while entry lock is held.
Expand All @@ -1849,33 +1845,9 @@ public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transfe
*/
public CacheObject fromOffheap(long valPtr, boolean tmp) throws IgniteCheckedException {
assert config().getMemoryMode() == OFFHEAP_TIERED || config().getMemoryMode() == OFFHEAP_VALUES;
assert valPtr != 0;

// TODO IGNITE-51.
if (portableEnabled())
return (CacheObject)portable().unmarshal(valPtr, !tmp);

long ptr = valPtr;

int size = UNSAFE.getInt(ptr);

ptr += 4;

boolean plainByteArr = UNSAFE.getByte(ptr++) == 1;

byte[] bytes = U.copyMemory(ptr, size);

if (plainByteArr)
return new CacheObjectImpl(bytes, null);

if (offheapTiered()) {
IgniteUuid valClsLdrId = U.readGridUuid(ptr + size);

ClassLoader ldr = valClsLdrId != null ? deploy().getClassLoader(valClsLdrId) : deploy().localLoader();

return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes);
}
else
return new CacheObjectImpl(marshaller().unmarshal(bytes, U.gridClassLoader()), bytes);
return ctx.portable().toCacheObject(this, valPtr, tmp);
}

/**
Expand All @@ -1885,6 +1857,7 @@ public CacheObject fromOffheap(long valPtr, boolean tmp) throws IgniteCheckedExc
* @param skipVals Skip values flag.
* @param keepCacheObjects Keep cache objects flag.
* @param deserializePortable Deserialize portable flag.
* @param cpy Copy flag.
*/
@SuppressWarnings("unchecked")
public <K1, V1> void addResult(Map<K1, V1> map,
Expand Down
Expand Up @@ -149,7 +149,7 @@ protected GridCacheMapEntry(GridCacheContext<?, ?> cctx, KeyCacheObject key, int
GridCacheMapEntry next, long ttl, int hdrId) {
log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class);

key = (KeyCacheObject)cctx.kernalContext().portable().detachPortable(key, cctx);
key = (KeyCacheObject)cctx.kernalContext().portable().prepareForCache(key, cctx);

assert key != null;

Expand All @@ -159,7 +159,7 @@ protected GridCacheMapEntry(GridCacheContext<?, ?> cctx, KeyCacheObject key, int

ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl));

val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

synchronized (this) {
value(val);
Expand Down Expand Up @@ -499,7 +499,7 @@ public boolean isStartVersion() {
if (delta >= 0) {
CacheObject val = e.value();

val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

// Set unswapped value.
update(val, e.expireTime(), e.ttl(), e.version());
Expand Down Expand Up @@ -846,7 +846,7 @@ else if (tx.dht()) {
if (startVer.equals(ver)) {
if (ret != null) {
// Detach value before index update.
ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx);
ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx);

GridCacheVersion nextVer = nextVersion();

Expand Down Expand Up @@ -929,7 +929,7 @@ else if (tx.dht()) {
long expTime = CU.toExpireTime(ttl);

// Detach value before index update.
ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx);
ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx);

// Update indexes.
if (ret != null) {
Expand Down Expand Up @@ -1066,7 +1066,7 @@ else if (interceptorVal != val0)
assert expireTime >= 0 : expireTime;

// Detach value before index update.
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
Expand Down Expand Up @@ -1378,7 +1378,7 @@ else if (ttl == CU.TTL_NOT_CHANGED)
}

// Detach value before index update.
old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx);
old = (CacheObject)cctx.kernalContext().portable().prepareForCache(old, cctx);

if (old != null)
updateIndex(old, expireTime, ver, null);
Expand Down Expand Up @@ -1514,7 +1514,7 @@ else if (ttl != CU.TTL_ZERO)
// Try write-through.
if (op == GridCacheOperation.UPDATE) {
// Detach value before index update.
updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx);
updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx);

if (writeThrough)
// Must persist inside synchronization in non-tx mode.
Expand Down Expand Up @@ -1803,7 +1803,7 @@ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer) <= 0 :
readThrough = true;

// Detach value before index update.
oldVal = (CacheObject)cctx.kernalContext().portable().detachPortable(oldVal, cctx);
oldVal = (CacheObject)cctx.kernalContext().portable().prepareForCache(oldVal, cctx);

// Calculate initial TTL and expire time.
long initTtl;
Expand Down Expand Up @@ -2036,7 +2036,7 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this
// Do not change size.
}

updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx);
updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx);

// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
Expand Down Expand Up @@ -2957,7 +2957,7 @@ void next(int segId, @Nullable GridCacheMapEntry next) {
// in load methods without actually holding entry lock.
long expireTime = expireTimeExtras();

val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

updateIndex(val, expireTime, nextVer, old);

Expand Down Expand Up @@ -3237,7 +3237,7 @@ protected boolean hasValueUnlocked() {
if (isNew() || (!preload && deletedUnlocked())) {
long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;

val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

if (val != null)
updateIndex(val, expTime, ver, null);
Expand Down Expand Up @@ -3290,7 +3290,7 @@ else if (deletedUnlocked())
CacheObject val = unswapped.value();

if (cctx.portableEnabled()) {
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

if (cctx.offheapTiered() && !unswapped.valueIsByteArray())
unswapped.valueBytes(cctx.convertPortableBytes(unswapped.valueBytes()));
Expand Down Expand Up @@ -3337,7 +3337,7 @@ else if (deletedUnlocked())
long expTime = CU.toExpireTime(ttl);

// Detach value before index update.
val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx);
val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx);

if (val != null) {
updateIndex(val, expTime, newVer, old);
Expand Down Expand Up @@ -4303,61 +4303,6 @@ private int extrasSize() {
return extras != null ? extras.size() : 0;
}

/**
* @return Value bytes read from offheap.
* @throws IgniteCheckedException If failed.
*/
private GridCacheValueBytes offheapValueBytes() throws IgniteCheckedException {
assert cctx.offheapTiered() && valPtr != 0;

long ptr = valPtr;

boolean plainByteArr = UNSAFE.getByte(ptr++) != 0;

if (plainByteArr || !cctx.portableEnabled()) {
int size = UNSAFE.getInt(ptr);

byte[] bytes = U.copyMemory(ptr + 4, size);

return plainByteArr ? GridCacheValueBytes.plain(bytes) : GridCacheValueBytes.marshaled(bytes);
}

assert cctx.portableEnabled();

return GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), cctx.portable().unmarshal(valPtr, true)));
}

/**
* @param tmp If {@code true} can return temporary object.
* @return Unmarshalled value.
* @throws IgniteCheckedException If unmarshalling failed.
*/
private CacheObject unmarshalOffheap(boolean tmp) throws IgniteCheckedException {
assert cctx.offheapTiered() && valPtr != 0;

if (cctx.portableEnabled())
return (CacheObject)cctx.portable().unmarshal(valPtr, !tmp);

long ptr = valPtr;

boolean plainByteArr = UNSAFE.getByte(ptr++) != 0;

int size = UNSAFE.getInt(ptr);

byte[] res = U.copyMemory(ptr + 4, size);

// TODO IGNITE-51.
// if (plainByteArr)
// return (V)res;

IgniteUuid valClsLdrId = U.readGridUuid(ptr + 4 + size);

ClassLoader ldr = valClsLdrId != null ? cctx.deploy().getClassLoader(valClsLdrId) :
cctx.deploy().localLoader();

return cctx.marshaller().unmarshal(res, ldr);
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
// Identity comparison left on purpose.
Expand Down
Expand Up @@ -85,7 +85,9 @@ public KeyCacheObjectImpl(Object val, byte[] valBytes) {

if (cpy) {
try {
return (T)ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader());
return (T)ctx.portable().unmarshal(ctx.cacheObjectContext(),
valBytes,
ctx.deploy().globalLoader());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshal object.", e);
Expand Down
Expand Up @@ -51,7 +51,7 @@ public UserCacheObjectImpl() {
else {
try {
if (valBytes == null)
valBytes = CU.marshal(ctx.shared(), val);
valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val);

return new CacheObjectImpl(null, valBytes);
}
Expand Down

0 comments on commit 4c7f456

Please sign in to comment.