Skip to content

Commit

Permalink
Ability to write cache objects to byte buffers.
Browse files Browse the repository at this point in the history
  • Loading branch information
agoncharuk committed Jan 22, 2016
1 parent 50be063 commit 26a191b
Show file tree
Hide file tree
Showing 21 changed files with 138 additions and 50 deletions.
Expand Up @@ -206,6 +206,11 @@ public BinaryEnumObjectImpl(BinaryContext ctx, int typeId, @Nullable String clsN
return ctx.marshaller().marshal(this);
}

/** {@inheritDoc} */
@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
throw new UnsupportedOperationException("TODO implement.");
}

/** {@inheritDoc} */
@Override public byte cacheObjectType() {
return TYPE_BINARY;
Expand Down
Expand Up @@ -133,6 +133,19 @@ public BinaryObjectImpl(BinaryContext ctx, byte[] arr, int start) {
return arr0;
}

@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
int len = length();

if (buf.remaining() < len + 5)
return false;

buf.put(cacheObjectType());
buf.putInt(len);
buf.put(arr, start, len);

return true;
}

/** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectContext ctx) {
if (detached())
Expand Down
Expand Up @@ -130,6 +130,10 @@ public BinaryObject heapCopy() {
return null;
}

@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
throw new UnsupportedOperationException("TODO implement");
}

/** {@inheritDoc} */
@Override public long offheapAddress() {
return ptr;
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cache;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -48,6 +49,8 @@ public interface CacheObject extends Message {
*/
public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException;

public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException;

/**
* @return Object type.
*/
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -68,6 +69,18 @@ protected boolean needCopy(CacheObjectContext ctx) {
valBytes = U.readByteArray(in);
}

/** {@inheritDoc} */
@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
if (buf.remaining() < valBytes.length + 5)
return false;

buf.put(cacheObjectType());
buf.putInt(valBytes.length);
buf.put(valBytes);

return true;
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
Expand Down
Expand Up @@ -74,6 +74,18 @@ public CacheObjectByteArrayImpl(byte[] val) {
return val;
}

/** {@inheritDoc} */
@Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException {
if (buf.remaining() < val.length + 5)
return false;

buf.put(cacheObjectType());
buf.putInt(val.length);
buf.put(val);

return true;
}

/** {@inheritDoc} */
@Override public byte cacheObjectType() {
return TYPE_BYTE_ARR;
Expand Down
Expand Up @@ -530,7 +530,7 @@ public boolean isStartVersion() {
return val;
}
else
clearIndex(e.value());
clearIndex(e.value(), e.version());
}
}
}
Expand Down Expand Up @@ -788,7 +788,7 @@ private Object innerGet0(IgniteInternalTx tx,
releaseSwap();

// Previous value is guaranteed to be null
clearIndex(null);
clearIndex(null, ver);
}
else {
// Read and remove swap entry.
Expand Down Expand Up @@ -1026,7 +1026,7 @@ else if (tx.dht()) {
deletedUnlocked(false);
}
else {
clearIndex(old);
clearIndex(old, ver);

if (cctx.deferredDelete() && !isInternal() && !detached() && !deletedUnlocked())
deletedUnlocked(true);
Expand Down Expand Up @@ -1323,7 +1323,7 @@ protected Object keyValue(boolean cpy) {

// Clear indexes inside of synchronization since indexes
// can be updated without actually holding entry lock.
clearIndex(old);
clearIndex(old, ver);

boolean hadValPtr = hasOffHeapPointer();

Expand Down Expand Up @@ -1513,7 +1513,7 @@ else if (ttl == CU.TTL_NOT_CHANGED)
if (old != null)
updateIndex(old, expireTime, ver, null);
else
clearIndex(null);
clearIndex(null, ver);

update(old, expireTime, ttl, ver);
}
Expand Down Expand Up @@ -1697,7 +1697,7 @@ else if (ttl != CU.TTL_ZERO)

// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(old);
clearIndex(old, this.ver);

update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver);

Expand Down Expand Up @@ -2055,7 +2055,7 @@ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
if (oldVal != null)
updateIndex(oldVal, initExpireTime, ver, null);
else
clearIndex(null);
clearIndex(null, ver);

update(oldVal, initExpireTime, initTtl, ver);

Expand Down Expand Up @@ -2331,7 +2331,7 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this

// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(oldVal);
clearIndex(oldVal, ver);

if (hadVal) {
assert !deletedUnlocked();
Expand Down Expand Up @@ -2612,7 +2612,7 @@ protected void clearReader(UUID nodeId) throws GridCacheEntryRemovedException {
if (log.isDebugEnabled())
log.debug("Entry has been marked obsolete: " + this);

clearIndex(val);
clearIndex(val, ver);

releaseSwap();

Expand Down Expand Up @@ -2793,7 +2793,7 @@ protected final boolean markObsolete0(GridCacheVersion ver, boolean clear, GridC

releaseSwap();

clearIndex(val);
clearIndex(val, ver);

onInvalidate();
}
Expand Down Expand Up @@ -3120,7 +3120,7 @@ private boolean checkExpired() throws IgniteCheckedException {
if (delta <= 0) {
releaseSwap();

clearIndex(saveValueForIndexUnlocked());
clearIndex(saveValueForIndexUnlocked(), ver);

return true;
}
Expand Down Expand Up @@ -3577,7 +3577,7 @@ private GridCacheVersion nextVersion() {
}
}

clearIndex(expiredVal);
clearIndex(expiredVal, ver);

releaseSwap();

Expand Down Expand Up @@ -3750,14 +3750,14 @@ protected void updateIndex(@Nullable CacheObject val,
* @param prevVal Previous value (if needed for index update).
* @throws IgniteCheckedException If failed.
*/
protected void clearIndex(CacheObject prevVal) throws IgniteCheckedException {
protected void clearIndex(CacheObject prevVal, GridCacheVersion prevVer) throws IgniteCheckedException {
assert Thread.holdsLock(this);

try {
GridCacheQueryManager<?, ?> qryMgr = cctx.queries();

if (qryMgr.enabled())
qryMgr.remove(key(), prevVal);
qryMgr.remove(key(), prevVal, prevVer);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
Expand Down Expand Up @@ -3916,7 +3916,7 @@ private synchronized <K, V> CacheEntryImplEx<K, V> wrapVersionedWithValue() {
}
}
else
clearIndex(prev);
clearIndex(prev, ver);

// Nullify value after swap.
value(null);
Expand Down Expand Up @@ -3970,7 +3970,7 @@ private synchronized <K, V> CacheEntryImplEx<K, V> wrapVersionedWithValue() {
}
}
else
clearIndex(prevVal);
clearIndex(prevVal, ver);

// Nullify value after swap.
value(null);
Expand Down
Expand Up @@ -2870,7 +2870,9 @@ public void onEvictFromSwap(String spaceName, byte[] keyBytes, byte[] valBytes)

assert val != null;

qryMgr.remove(key, val);
// TODO sql-store

qryMgr.remove(key, val, null);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to unmarshal key evicted from swap [swapSpaceName=" + spaceName + ']', e);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
Expand Down
Expand Up @@ -577,7 +577,7 @@ public boolean clearInternal(
if (log.isDebugEnabled())
log.debug("Entry has been marked obsolete: " + this);

clearIndex(prev);
clearIndex(prev, ver);

// Give to GC.
update(null, 0L, 0L, ver);
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) {
}

/** {@inheritDoc} */
@Override protected void clearIndex(CacheObject val) throws IgniteCheckedException {
@Override protected void clearIndex(CacheObject val, GridCacheVersion ver) throws IgniteCheckedException {
// No-op for detached entries, index is updated on primary or backup nodes.
}

Expand Down
Expand Up @@ -451,7 +451,7 @@ public boolean loadedValue(@Nullable IgniteInternalTx tx,
}

/** {@inheritDoc} */
@Override protected void clearIndex(CacheObject val) {
@Override protected void clearIndex(CacheObject val, GridCacheVersion ver) {
// No-op.
}

Expand Down
Expand Up @@ -403,7 +403,7 @@ public void store(CacheObject key, CacheObject val, GridCacheVersion ver, long e
return; // Ignore index update when node is stopping.

try {
qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
qryProc.store(space, key, val, ver, expirationTime);
}
finally {
invalidateResultCache();
Expand All @@ -418,7 +418,7 @@ public void store(CacheObject key, CacheObject val, GridCacheVersion ver, long e
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings("SimplifiableIfStatement")
public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException {
public void remove(CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException {
assert key != null;

if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal))
Expand All @@ -428,7 +428,7 @@ public void remove(CacheObject key, CacheObject val) throws IgniteCheckedExcepti
return; // Ignore index update when node is stopping.

try {
qryProc.remove(space, key, val);
qryProc.remove(space, key, val, ver);
}
finally {
invalidateResultCache();
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cacheobject;

import java.nio.ByteBuffer;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
Expand Down Expand Up @@ -151,6 +152,13 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
*/
public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes);

/**
* @param ctx Cache context.
* @param buf Buffer to read from.
* @return Cache object.
*/
public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf);

/**
* @param ctx Context.
* @param valPtr Value pointer.
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cacheobject;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -171,6 +172,19 @@ protected KeyCacheObject toCacheKeyObject0(Object obj, boolean userObj) {
throw new IllegalArgumentException("Invalid object type: " + type);
}

/** {@inheritDoc} */
@Override public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf) {
byte type = buf.get();

int len = buf.getInt();

byte[] data = new byte[len];

buf.get(data);

return toCacheObject(ctx, type, data);
}

/** {@inheritDoc} */
@Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx,
@Nullable Object obj,
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
Expand Down Expand Up @@ -186,7 +187,7 @@ public long size(@Nullable String spaceName, GridQueryTypeDescriptor desc, Index
* @throws IgniteCheckedException If failed.
*/
public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject key, CacheObject val,
byte[] ver, long expirationTime) throws IgniteCheckedException;
GridCacheVersion ver, long expirationTime) throws IgniteCheckedException;

/**
* Removes index entry by key.
Expand All @@ -196,7 +197,7 @@ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, Cach
* @param val Value.
* @throws IgniteCheckedException If failed.
*/
public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException;
public void remove(@Nullable String spaceName, CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException;

/**
* Will be called when entry with given key is swapped.
Expand Down

0 comments on commit 26a191b

Please sign in to comment.