Skip to content

Commit

Permalink
IGNITE-51 Changed CacheInterceptor API. Added CacheLazyEntry,
Browse files Browse the repository at this point in the history
  • Loading branch information
niktikhonov committed Mar 3, 2015
1 parent 736158d commit 43c04a0
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 136 deletions.
Expand Up @@ -21,6 +21,8 @@
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import javax.cache.*;

/** /**
* Cache interceptor. Cache interceptor can be used for getting callbacks before * Cache interceptor. Cache interceptor can be used for getting callbacks before
* and after cache {@code get(...)}, {@code put(...)}, and {@code remove(...)} * and after cache {@code get(...)}, {@code put(...)}, and {@code remove(...)}
Expand Down Expand Up @@ -61,13 +63,12 @@ public interface CacheInterceptor<K, V> {
* <p> * <p>
* This method should not throw any exception. * This method should not throw any exception.
* *
* @param key Key. * @param entry Old entry.
* @param oldVal Old value.
* @param newVal New value. * @param newVal New value.
* @return Value to be put to cache. Returning {@code null} cancels the update. * @return Value to be put to cache. Returning {@code null} cancels the update.
* @see CacheProjection#put(Object, Object, IgnitePredicate[]) * @see CacheProjection#put(Object, Object, IgnitePredicate[])
*/ */
@Nullable public V onBeforePut(K key, @Nullable V oldVal, V newVal); @Nullable public V onBeforePut(Cache.Entry<K, V> entry, V newVal);


/** /**
* This method is called after new value has been stored. * This method is called after new value has been stored.
Expand All @@ -79,10 +80,9 @@ public interface CacheInterceptor<K, V> {
* <p> * <p>
* This method should not throw any exception. * This method should not throw any exception.
* *
* @param key Key. * @param entry Current entry.
* @param val Current value.
*/ */
public void onAfterPut(K key, V val); public void onAfterPut(Cache.Entry<K, V> entry);


/** /**
* This method is called within {@link CacheProjection#remove(Object, IgnitePredicate[])} * This method is called within {@link CacheProjection#remove(Object, IgnitePredicate[])}
Expand All @@ -95,14 +95,13 @@ public interface CacheInterceptor<K, V> {
* <p> * <p>
* This method should not throw any exception. * This method should not throw any exception.
* *
* @param key Key. * @param entry Old entry.
* @param val Old value.
* @return Tuple. The first value is the flag whether remove should be cancelled or not. * @return Tuple. The first value is the flag whether remove should be cancelled or not.
* The second is the value to be returned as result of {@code remove()} operation, * The second is the value to be returned as result of {@code remove()} operation,
* may be {@code null}. * may be {@code null}.
* @see CacheProjection#remove(Object, IgnitePredicate[]) * @see CacheProjection#remove(Object, IgnitePredicate[])
*/ */
@Nullable public IgniteBiTuple<Boolean, V> onBeforeRemove(K key, @Nullable V val); @Nullable public IgniteBiTuple<Boolean, V> onBeforeRemove(Cache.Entry<K, V> entry);


/** /**
* This method is called after value has been removed. * This method is called after value has been removed.
Expand All @@ -114,8 +113,7 @@ public interface CacheInterceptor<K, V> {
* <p> * <p>
* This method should not throw any exception. * This method should not throw any exception.
* *
* @param key Key. * @param entry Removed entry.
* @param val Removed value.
*/ */
public void onAfterRemove(K key, V val); public void onAfterRemove(Cache.Entry<K, V> entry);
} }
Expand Up @@ -20,6 +20,8 @@
import org.apache.ignite.lang.*; import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*; import org.jetbrains.annotations.*;


import javax.cache.*;

/** /**
* Cache interceptor convenience adapter. It provides no-op implementations for all * Cache interceptor convenience adapter. It provides no-op implementations for all
* interceptor callbacks. * interceptor callbacks.
Expand All @@ -31,22 +33,22 @@ public class CacheInterceptorAdapter<K, V> implements CacheInterceptor<K, V> {
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public V onBeforePut(K key, @Nullable V oldVal, V newVal) { @Nullable @Override public V onBeforePut(Cache.Entry<K, V> entry, V newVal) {
return newVal; return newVal;
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onAfterPut(K key, V val) { @Override public void onAfterPut(Cache.Entry<K, V> entry) {
// No-op. // No-op.
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public IgniteBiTuple<Boolean, V> onBeforeRemove(K key, @Nullable V val) { @Nullable @Override public IgniteBiTuple<Boolean, V> onBeforeRemove(Cache.Entry<K, V> entry) {
return new IgniteBiTuple<>(false, val); return new IgniteBiTuple<>(false, entry.getValue());
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void onAfterRemove(K key, V val) { @Override public void onAfterRemove(Cache.Entry<K, V> entry) {
// No-op. // No-op.
} }
} }
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.cache;

import org.apache.ignite.internal.util.typedef.internal.*;

import javax.cache.*;

/**
*
*/
public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
/** Cache context. */
private GridCacheContext<K, V> cctx;

/** Key cache object. */
private KeyCacheObject keyObj;

/** Cache object value. */
private CacheObject valObj;

/** Key. */
private K key;

/** Value. */
private V val;

/**
* @param keyObj Key cache object.
* @param valObj Cache object value.
* @param cctx Cache context.
*/
public CacheLazyEntry(KeyCacheObject keyObj, CacheObject valObj, GridCacheContext<K, V> cctx) {
this.keyObj = keyObj;
this.valObj = valObj;
this.cctx = cctx;
}

/**
* @param key Key.
* @param val Value.
*/
public CacheLazyEntry(K key, V val) {
this.key = key;
this.val = val;
}

/**
* @param keyObj Key cache object.
* @param valObj Cache object value.
* @param key Key.
* @param val Value.
* @param cctx Cache context.
*/
public CacheLazyEntry(KeyCacheObject keyObj,
CacheObject valObj,
K key,
V val,
GridCacheContext<K, V> cctx) {
this.keyObj = keyObj;
this.valObj = valObj;
this.val = val;
this.key = key;
this.cctx = cctx;
}

/** {@inheritDoc} */
@Override public K getKey() {
if (key == null)
key = CU.value(keyObj, cctx, true);

return key;
}

/** {@inheritDoc} */
@Override public V getValue() {
if (val == null)
val = CU.value(valObj, cctx, true);

return val;
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <T> T unwrap(Class<T> cls) {
if(cls.isAssignableFrom(getClass()))
return cls.cast(this);

throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);
}

/** {@inheritDoc} */
public String toString() {
return "CacheEntry [key=" + getKey() + ", val=" + getValue() + ']';
}
}
Expand Up @@ -3320,7 +3320,7 @@ public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final C
V retVal = CU.value(ret, ctx, true); V retVal = CU.value(ret, ctx, true);


if (ctx.config().getInterceptor() != null) if (ctx.config().getInterceptor() != null)
return (V)ctx.config().getInterceptor().onBeforeRemove(key, retVal).get2(); return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, retVal)).get2();


return retVal; return retVal;
} }
Expand Down
Expand Up @@ -1002,7 +1002,6 @@ protected void recordNodeId(UUID nodeId) {


boolean intercept = cctx.config().getInterceptor() != null; boolean intercept = cctx.config().getInterceptor() != null;


Object key0 = null;
Object val0 = null; Object val0 = null;


synchronized (this) { synchronized (this) {
Expand Down Expand Up @@ -1030,11 +1029,9 @@ protected void recordNodeId(UUID nodeId) {
old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val; old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;


if (intercept) { if (intercept) {
key0 = key.value(cctx, false); val0 = CU.value(val, cctx, true);
val0 = CU.value(val, cctx, false);


Object interceptorVal = cctx.config().getInterceptor().onBeforePut(key0, Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(key, old, cctx),
CU.value(old, cctx, false),
val0); val0);


if (interceptorVal == null) if (interceptorVal == null)
Expand Down Expand Up @@ -1117,7 +1114,7 @@ else if (interceptorVal != val0)
cctx.store().putToStore(tx, key, val, newVer); cctx.store().putToStore(tx, key, val, newVer);


if (intercept) if (intercept)
cctx.config().getInterceptor().onAfterPut(key0, val0); cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(key, val, cctx));


return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) : return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) :
new GridCacheUpdateTxResult(false, null); new GridCacheUpdateTxResult(false, null);
Expand Down Expand Up @@ -1157,8 +1154,7 @@ else if (interceptorVal != val0)


IgniteBiTuple<Boolean, Object> interceptRes = null; IgniteBiTuple<Boolean, Object> interceptRes = null;


Object key0 = null; Cache.Entry entry0 = null;
Object old0 = null;


synchronized (this) { synchronized (this) {
checkObsolete(); checkObsolete();
Expand All @@ -1181,10 +1177,9 @@ else if (interceptorVal != val0)
old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val;


if (intercept) { if (intercept) {
key0 = key.value(cctx, false); entry0 = new CacheLazyEntry(key, old, cctx);
old0 = CU.value(old, cctx, false);


interceptRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0); interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0);


if (cctx.cancelRemove(interceptRes)) { if (cctx.cancelRemove(interceptRes)) {
CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
Expand Down Expand Up @@ -1296,7 +1291,7 @@ else if (log.isDebugEnabled())
} }


if (intercept) if (intercept)
cctx.config().getInterceptor().onAfterRemove(key0, old0); cctx.config().getInterceptor().onAfterRemove(entry0);


if (valid) { if (valid) {
CacheObject ret; CacheObject ret;
Expand Down Expand Up @@ -1460,12 +1455,11 @@ else if (ttl == CU.TTL_NOT_CHANGED)


if (intercept) { if (intercept) {
if (op == GridCacheOperation.UPDATE) { if (op == GridCacheOperation.UPDATE) {
key0 = value(key0, key, false);
updated0 = value(updated0, updated, false); updated0 = value(updated0, updated, false);
old0 = value(old0, old, false); old0 = value(old0, old, false);


Object interceptorVal Object interceptorVal = cctx.config().getInterceptor()
= cctx.config().getInterceptor().onBeforePut(key0, old0, updated0); .onBeforePut(new CacheLazyEntry(key, old, key0, old0, cctx), updated0);


if (interceptorVal == null) if (interceptorVal == null)
return new GridTuple3<>(false, cctx.unwrapTemporary(old0), invokeRes); return new GridTuple3<>(false, cctx.unwrapTemporary(old0), invokeRes);
Expand All @@ -1476,10 +1470,8 @@ else if (ttl == CU.TTL_NOT_CHANGED)
} }
} }
else { else {
key0 = value(key0, key, false); interceptorRes = cctx.config().getInterceptor()
old0 = value(old0, old, false); .onBeforeRemove(new CacheLazyEntry(key, old, key0, old0, cctx));

interceptorRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0);


if (cctx.cancelRemove(interceptorRes)) if (cctx.cancelRemove(interceptorRes))
return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes); return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes);
Expand Down Expand Up @@ -1590,9 +1582,9 @@ else if (ttl != CU.TTL_ZERO)


if (intercept) { if (intercept) {
if (op == GridCacheOperation.UPDATE) if (op == GridCacheOperation.UPDATE)
cctx.config().getInterceptor().onAfterPut(key0, updated0); cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(key, null, key0, updated0, cctx));
else else
cctx.config().getInterceptor().onAfterRemove(key0, old0); cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(key, old, key0, old0, cctx));
} }
} }


Expand Down Expand Up @@ -1992,11 +1984,10 @@ else if (newSysTtl == CU.TTL_ZERO) {
// Actual update. // Actual update.
if (op == GridCacheOperation.UPDATE) { if (op == GridCacheOperation.UPDATE) {
if (intercept) { if (intercept) {
key0 = value(key0, key, false);
old0 = value(old0, oldVal, false);
updated0 = value(updated0, updated, false); updated0 = value(updated0, updated, false);


Object interceptorVal = cctx.config().getInterceptor().onBeforePut(key0, old0, updated0); Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(key, oldVal,
key0, old0, cctx), updated0);


if (interceptorVal == null) if (interceptorVal == null)
return new GridCacheUpdateAtomicResult(false, return new GridCacheUpdateAtomicResult(false,
Expand Down Expand Up @@ -2071,10 +2062,8 @@ assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this
} }
else { else {
if (intercept) { if (intercept) {
key0 = value(key0, key, false); interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(key, oldVal, key0,
old0 = value(old0, oldVal, false); old0, cctx));

interceptRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0);


if (cctx.cancelRemove(interceptRes)) if (cctx.cancelRemove(interceptRes))
return new GridCacheUpdateAtomicResult(false, return new GridCacheUpdateAtomicResult(false,
Expand Down Expand Up @@ -2170,12 +2159,9 @@ assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + thi


if (intercept) { if (intercept) {
if (op == GridCacheOperation.UPDATE) if (op == GridCacheOperation.UPDATE)
cctx.config().getInterceptor().onAfterPut(key0, updated0); cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(key, null, key0, updated0, cctx));
else { else
old0 = value(old0, oldVal, false); cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(key, oldVal, null, old0, cctx));

cctx.config().getInterceptor().onAfterRemove(key0, old0);
}


if (interceptRes != null) if (interceptRes != null)
oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
Expand Down

0 comments on commit 43c04a0

Please sign in to comment.