Skip to content

Commit

Permalink
Page memory integration WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Feb 3, 2016
1 parent ee0982a commit ec348ed
Show file tree
Hide file tree
Showing 24 changed files with 307 additions and 77 deletions.
Expand Up @@ -112,7 +112,7 @@ public BinaryEnumObjectImpl(BinaryContext ctx, int typeId, @Nullable String clsN
@Override public <T> T deserialize() throws BinaryObjectException { @Override public <T> T deserialize() throws BinaryObjectException {
Class cls = BinaryUtils.resolveClass(ctx, typeId, clsName, ctx.configuration().getClassLoader(), true); Class cls = BinaryUtils.resolveClass(ctx, typeId, clsName, ctx.configuration().getClassLoader(), true);


return BinaryEnumCache.get(cls, ord); return (T)BinaryEnumCache.get(cls, ord);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -210,6 +210,11 @@ public BinaryEnumObjectImpl(BinaryContext ctx, int typeId, @Nullable String clsN
throw new UnsupportedOperationException("TODO implement."); throw new UnsupportedOperationException("TODO implement.");
} }


/** {@inheritDoc} */
@Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
throw new UnsupportedOperationException("TODO implement.");
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public byte cacheObjectType() { @Override public byte cacheObjectType() {
return TYPE_BINARY; return TYPE_BINARY;
Expand Down
Expand Up @@ -133,6 +133,7 @@ public BinaryObjectImpl(BinaryContext ctx, byte[] arr, int start) {
return arr0; return arr0;
} }


/** {@inheritDoc} */
@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
int len = length(); int len = length();


Expand All @@ -146,6 +147,11 @@ public BinaryObjectImpl(BinaryContext ctx, byte[] arr, int start) {
return true; return true;
} }


/** {@inheritDoc} */
@Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
return length() + 5;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectContext ctx) { @Override public CacheObject prepareForCache(CacheObjectContext ctx) {
if (detached()) if (detached())
Expand Down
Expand Up @@ -128,6 +128,11 @@ public BinaryObject heapCopy() {
throw new UnsupportedOperationException("TODO implement"); throw new UnsupportedOperationException("TODO implement");
} }


/** {@inheritDoc} */
@Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
throw new UnsupportedOperationException("TODO implement");
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public long offheapAddress() { @Override public long offheapAddress() {
return ptr; return ptr;
Expand Down
Expand Up @@ -49,6 +49,20 @@ public interface CacheObject extends Message {
*/ */
public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException;


/**
* @param ctx Cache object context.
* @return Size required to store this value object.
* @throws IgniteCheckedException If failed.
*/
public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException;

/**
* @param buf Buffer to write value to.
* @param ctx Cache object context.
* @return {@code True} if value was successfully written, {@code false} if there was not enough space in the
* buffer.
* @throws IgniteCheckedException If failed.
*/
public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException; public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException;


/** /**
Expand Down
Expand Up @@ -84,6 +84,14 @@ protected boolean needCopy(CacheObjectContext ctx) {
return true; return true;
} }


/** {@inheritDoc} */
@Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
if (valBytes == null)
valueBytes(ctx);

return valBytes.length + 5;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf); reader.setBuffer(buf);
Expand Down
Expand Up @@ -86,6 +86,10 @@ public CacheObjectByteArrayImpl(byte[] val) {
return true; return true;
} }


@Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
return val.length + 5;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public byte cacheObjectType() { @Override public byte cacheObjectType() {
return TYPE_BYTE_ARR; return TYPE_BYTE_ARR;
Expand Down
Expand Up @@ -1861,7 +1861,7 @@ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable f


final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough();


final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled(); final boolean needEntry = storeEnabled || ctx.isSwapOrOffheapEnabled() || ctx.isDatabaseEnabled();


Map<KeyCacheObject, GridCacheVersion> misses = null; Map<KeyCacheObject, GridCacheVersion> misses = null;


Expand All @@ -1878,7 +1878,7 @@ public final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable f


try { try {
T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null, T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(null,
ctx.isSwapOrOffheapEnabled(), ctx.isSwapOrOffheapEnabled() || ctx.isDatabaseEnabled(),
/*unmarshal*/true, /*unmarshal*/true,
/*update-metrics*/!skipVals, /*update-metrics*/!skipVals,
/*event*/!skipVals, /*event*/!skipVals,
Expand Down
Expand Up @@ -1428,6 +1428,13 @@ public boolean isOffHeapEnabled() {
return swapMgr.offHeapEnabled(); return swapMgr.offHeapEnabled();
} }


/**
* @return If database is enabled.
*/
public boolean isDatabaseEnabled() {
return storeMgr.isDatabaseEnabled();
}

/** /**
* @return {@code True} if store read-through mode is enabled. * @return {@code True} if store read-through mode is enabled.
*/ */
Expand Down
Expand Up @@ -791,7 +791,7 @@ public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
U.error(log, "Failed to evict entry from cache: " + e, ex); U.error(log, "Failed to evict entry from cache: " + e, ex);
} }


if (!cctx.isNear() && memoryMode == OFFHEAP_TIERED) { if (!cctx.isNear() && (memoryMode == OFFHEAP_TIERED || cctx.isDatabaseEnabled())) {
try { try {
evict0(cctx.cache(), e, cctx.versions().next(), null, false); evict0(cctx.cache(), e, cctx.versions().next(), null, false);
} }
Expand Down
Expand Up @@ -475,62 +475,78 @@ public boolean isStartVersion() {
throws IgniteCheckedException, GridCacheEntryRemovedException { throws IgniteCheckedException, GridCacheEntryRemovedException {
boolean swapEnabled = cctx.swap().swapEnabled(); boolean swapEnabled = cctx.swap().swapEnabled();


if (!swapEnabled && !cctx.isOffHeapEnabled()) if (!swapEnabled && !cctx.isOffHeapEnabled() && !cctx.isDatabaseEnabled())
return null; return null;


synchronized (this) { synchronized (this) {
checkObsolete(); checkObsolete();


if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) { if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
GridCacheSwapEntry e; if (cctx.isDatabaseEnabled()) {
IgniteBiTuple<CacheObject, GridCacheVersion> read = cctx.queries().read(key);


if (cctx.offheapTiered()) { flags |= IS_UNSWAPPED_MASK;
e = cctx.swap().readOffheapPointer(this);


if (e != null) { if (read != null) {
if (e.offheapPointer() > 0) { CacheObject idxVal = read.get1();
offHeapPointer(e.offheapPointer());


flags |= IS_OFFHEAP_PTR_MASK; // Set unswapped value.
update(idxVal, 0, 0, read.get2());


if (needVal) { return idxVal;
CacheObject val = cctx.fromOffheap(offHeapPointer(), false); }
}
else {
GridCacheSwapEntry e;


e.value(val); if (cctx.offheapTiered()) {
e = cctx.swap().readOffheapPointer(this);

if (e != null) {
if (e.offheapPointer() > 0) {
offHeapPointer(e.offheapPointer());

flags |= IS_OFFHEAP_PTR_MASK;

if (needVal) {
CacheObject val = cctx.fromOffheap(offHeapPointer(), false);

e.value(val);
}
} }
else // Read from swap.
offHeapPointer(0);
} }
else // Read from swap.
offHeapPointer(0);
} }
} else
else e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']'); log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');


flags |= IS_UNSWAPPED_MASK; flags |= IS_UNSWAPPED_MASK;


// If there is a value. // If there is a value.
if (e != null) { if (e != null) {
long delta = e.expireTime() == 0 ? 0 : e.expireTime() - U.currentTimeMillis(); long delta = e.expireTime() == 0 ? 0 : e.expireTime() - U.currentTimeMillis();


if (delta >= 0) { if (delta >= 0) {
CacheObject val = e.value(); CacheObject val = e.value();


val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);


// Set unswapped value. // Set unswapped value.
update(val, e.expireTime(), e.ttl(), e.version()); update(val, e.expireTime(), e.ttl(), e.version());


// Must update valPtr again since update() will reset it. // Must update valPtr again since update() will reset it.
if (cctx.offheapTiered() && e.offheapPointer() > 0) if (cctx.offheapTiered() && e.offheapPointer() > 0)
offHeapPointer(e.offheapPointer()); offHeapPointer(e.offheapPointer());


return val; return val;
}
else
clearIndex(e.value(), e.version());
} }
else
clearIndex(e.value(), e.version());
} }
} }
} }
Expand Down
Expand Up @@ -815,6 +815,14 @@ public BinaryContext binaryContext() {
return super.toCacheObject(ctx, type, bytes); return super.toCacheObject(ctx, type, bytes);
} }


/** {@inheritDoc} */
@Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException {
if (type == BinaryObjectImpl.TYPE_BINARY)
return new BinaryObjectImpl(binaryContext(), bytes, 0);

return super.toKeyCacheObject(ctx, type, bytes);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp)
throws IgniteCheckedException { throws IgniteCheckedException {
Expand Down
Expand Up @@ -342,7 +342,7 @@ void processQueryRequest(UUID sndId, GridCacheQueryRequest req) {
* @param key Key. * @param key Key.
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void onSwap(CacheObject key) throws IgniteCheckedException { public void onSwap(KeyCacheObject key) throws IgniteCheckedException {
if (!enterBusy()) if (!enterBusy())
return; // Ignore index update when node is stopping. return; // Ignore index update when node is stopping.


Expand All @@ -361,7 +361,7 @@ public void onSwap(CacheObject key) throws IgniteCheckedException {
* @param val Value * @param val Value
* @throws IgniteCheckedException If failed. * @throws IgniteCheckedException If failed.
*/ */
public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException { public void onUnswap(KeyCacheObject key, CacheObject val) throws IgniteCheckedException {
if (!enterBusy()) if (!enterBusy())
return; // Ignore index update when node is stopping. return; // Ignore index update when node is stopping.


Expand Down Expand Up @@ -390,7 +390,7 @@ private void invalidateResultCache() {
* @param expirationTime Expiration time or 0 if never expires. * @param expirationTime Expiration time or 0 if never expires.
* @throws IgniteCheckedException In case of error. * @throws IgniteCheckedException In case of error.
*/ */
public void store(CacheObject key, CacheObject val, GridCacheVersion ver, long expirationTime) public void store(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expirationTime)
throws IgniteCheckedException { throws IgniteCheckedException {
assert key != null; assert key != null;
assert val != null; assert val != null;
Expand All @@ -412,13 +412,29 @@ public void store(CacheObject key, CacheObject val, GridCacheVersion ver, long e
} }
} }


/**
* @param key Key to read.
* @return Value tuple, if available.
*/
public IgniteBiTuple<CacheObject, GridCacheVersion> read(KeyCacheObject key) throws IgniteCheckedException {
if (!enterBusy())
return null; // Ignore index update when node is stopping.

try {
return qryProc.read(space, key);
}
finally {
leaveBusy();
}
}

/** /**
* @param key Key. * @param key Key.
* @param val Value. * @param val Value.
* @throws IgniteCheckedException Thrown in case of any errors. * @throws IgniteCheckedException Thrown in case of any errors.
*/ */
@SuppressWarnings("SimplifiableIfStatement") @SuppressWarnings("SimplifiableIfStatement")
public void remove(CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { public void remove(KeyCacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException {
assert key != null; assert key != null;


if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal)) if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
Expand Down
Expand Up @@ -50,6 +50,11 @@ public void initialize(@Nullable CacheStore<?, ?> cfgStore, Map<CacheStore, Thre
*/ */
public boolean configured(); public boolean configured();


/**
* @return {@code True} if database is enabled.
*/
public boolean isDatabaseEnabled();

/** /**
* @return Wrapped store. * @return Wrapped store.
*/ */
Expand Down
Expand Up @@ -254,6 +254,11 @@ private CacheStore cacheStoreWrapper(GridKernalContext ctx,
return store != null; return store != null;
} }


/** {@inheritDoc} */
@Override public boolean isDatabaseEnabled() {
return false;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheStore<?, ?> configuredStore() { @Override public CacheStore<?, ?> configuredStore() {
return cfgStore; return cfgStore;
Expand Down
Expand Up @@ -152,13 +152,28 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
*/ */
public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes); public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes);


/**
* @param ctx Cache context.
* @param type Object type.
* @param bytes Object bytes.
* @return Cache object.
*/
public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) throws IgniteCheckedException;

/** /**
* @param ctx Cache context. * @param ctx Cache context.
* @param buf Buffer to read from. * @param buf Buffer to read from.
* @return Cache object. * @return Cache object.
*/ */
public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf); public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf);


/**
* @param ctx Cache context.
* @param buf Buffer to read from.
* @return Cache object.
*/
public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, ByteBuffer buf) throws IgniteCheckedException;

/** /**
* @param ctx Context. * @param ctx Context.
* @param valPtr Value pointer. * @param valPtr Value pointer.
Expand Down

0 comments on commit ec348ed

Please sign in to comment.