Skip to content

Commit

Permalink
# sprint-2 ignore exception from event listener filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Feb 17, 2015
1 parent a4101a5 commit 68a3b77
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.resources.*;
import org.jdk8.backport.*;

import javax.cache.*;
Expand Down Expand Up @@ -651,7 +652,7 @@ private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdated
}
}
catch (Exception e) {
U.error(log, "CacheEntryCreatedListener failed: " + e);
U.error(log, "CacheEntryListener failed: " + e);
}
}
}
Expand Down Expand Up @@ -683,6 +684,10 @@ private static class JCacheQueryRemoteFilter<K, V> implements CacheEntryEventFil
/** */
private byte types;

/** */
@LoggerResource
private IgniteLogger log;

/**
* For {@link Externalizable}.
*/
Expand All @@ -703,7 +708,14 @@ public JCacheQueryRemoteFilter() {

/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) {
return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt));
try {
return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt));
}
catch (Exception e) {
U.error(log, "CacheEntryEventFilter failed: " + e);

return true;
}
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -84,6 +84,69 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
}

/**
* @throws Exception If failed.
*/
public void testExceptionIgnored() throws Exception {
CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new Factory<CacheEntryListener<Integer, Integer>>() {
@Override public CacheEntryListener<Integer, Integer> create() {
return new ExceptionListener();
}
},
null,
false,
false
);

IgniteCache<Integer, Integer> cache = jcache();

cache.registerCacheEntryListener(lsnrCfg);

try {
for (Integer key : keys()) {
log.info("Check listener exceptions are ignored [key=" + key + ']');

cache.put(key, key);

cache.remove(key);
}
}
finally {
cache.deregisterCacheEntryListener(lsnrCfg);
}

lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
new Factory<CacheEntryListener<Integer, Integer>>() {
@Override public CacheEntryListener<Integer, Integer> create() {
return new CreateUpdateRemoveExpireListener();
}
},
new Factory<CacheEntryEventFilter<? super Integer, ? super Integer>>() {
@Override public CacheEntryEventFilter<? super Integer, ? super Integer> create() {
return new ExceptionFilter();
}
},
false,
false
);

cache.registerCacheEntryListener(lsnrCfg);

try {
for (Integer key : keys()) {
log.info("Check filter exceptions are ignored [key=" + key + ']');

cache.put(key, key);

cache.remove(key);
}
}
finally {
cache.deregisterCacheEntryListener(lsnrCfg);
}
}

/**
* @throws Exception If failed.
*/
Expand Down Expand Up @@ -957,6 +1020,49 @@ static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
}
}

/**
*
*/
static class ExceptionFilter implements CacheEntryEventFilter<Integer, Integer> {
/** {@inheritDoc} */
@Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) {
throw new RuntimeException("Test filter error.");
}
}

/**
*
*/
static class ExceptionListener extends CreateUpdateListener
implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> {
/** {@inheritDoc} */
@Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
error();
}

/** {@inheritDoc} */
@Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
error();
}

/** {@inheritDoc} */
@Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
error();
}

/** {@inheritDoc} */
@Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
error();
}

/**
* Throws exception.
*/
private void error() {
throw new RuntimeException("Test listener error.");
}
}

/**
*
*/
Expand Down

0 comments on commit 68a3b77

Please sign in to comment.