diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index fc9811eb80cb0..8480211d742c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.security.*; +import org.apache.ignite.resources.*; import org.jdk8.backport.*; import javax.cache.*; @@ -651,7 +652,7 @@ private static class JCacheQueryLocalListener implements CacheEntryUpdated } } catch (Exception e) { - U.error(log, "CacheEntryCreatedListener failed: " + e); + U.error(log, "CacheEntryListener failed: " + e); } } } @@ -683,6 +684,10 @@ private static class JCacheQueryRemoteFilter implements CacheEntryEventFil /** */ private byte types; + /** */ + @LoggerResource + private IgniteLogger log; + /** * For {@link Externalizable}. */ @@ -703,7 +708,14 @@ public JCacheQueryRemoteFilter() { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent evt) { - return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); + try { + return (types & flag(evt.getEventType())) != 0 && (impl == null || impl.evaluate(evt)); + } + catch (Exception e) { + U.error(log, "CacheEntryEventFilter failed: " + e); + + return true; + } } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 0c2cb0c21e359..66892ea5ec606 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -84,6 +84,69 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } } + /** + * @throws Exception If failed. + */ + public void testExceptionIgnored() throws Exception { + CacheEntryListenerConfiguration lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory>() { + @Override public CacheEntryListener create() { + return new ExceptionListener(); + } + }, + null, + false, + false + ); + + IgniteCache cache = jcache(); + + cache.registerCacheEntryListener(lsnrCfg); + + try { + for (Integer key : keys()) { + log.info("Check listener exceptions are ignored [key=" + key + ']'); + + cache.put(key, key); + + cache.remove(key); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory>() { + @Override public CacheEntryListener create() { + return new CreateUpdateRemoveExpireListener(); + } + }, + new Factory>() { + @Override public CacheEntryEventFilter create() { + return new ExceptionFilter(); + } + }, + false, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + + try { + for (Integer key : keys()) { + log.info("Check filter exceptions are ignored [key=" + key + ']'); + + cache.put(key, key); + + cache.remove(key); + } + } + finally { + cache.deregisterCacheEntryListener(lsnrCfg); + } + } + /** * @throws Exception If failed. */ @@ -957,6 +1020,49 @@ static class CreateUpdateRemoveExpireListener extends CreateUpdateListener } } + /** + * + */ + static class ExceptionFilter implements CacheEntryEventFilter { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent evt) { + throw new RuntimeException("Test filter error."); + } + } + + /** + * + */ + static class ExceptionListener extends CreateUpdateListener + implements CacheEntryRemovedListener, CacheEntryExpiredListener { + /** {@inheritDoc} */ + @Override public void onCreated(Iterable> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable> evts) { + error(); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable> evts) { + error(); + } + + /** + * Throws exception. + */ + private void error() { + throw new RuntimeException("Test listener error."); + } + } + /** * */