diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java index 4048971a38a0c..bef88ec2e41c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.IndexQuery; import org.apache.ignite.cache.query.IndexQueryCriterion; +import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion; import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; @@ -41,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; +import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; @@ -52,6 +54,9 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId; + /** * Processor of {@link IndexQuery}. */ @@ -74,7 +79,8 @@ public IndexQueryResult queryLocal( IndexQueryDesc idxQryDesc, @Nullable IgniteBiPredicate filter, IndexingQueryFilter cacheFilter, - boolean keepBinary + boolean keepBinary, + int taskHash ) throws IgniteCheckedException { InlineIndexImpl idx = (InlineIndexImpl)findSortedIndex(cctx, idxQryDesc); @@ -86,6 +92,8 @@ public IndexQueryResult queryLocal( IndexQueryResultMeta meta = new IndexQueryResultMeta(def, qry.critSize()); + boolean isRecordable = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + // Map IndexRow to Cache Key-Value pair. return new IndexQueryResult<>(meta, new GridCloseableIteratorAdapter>() { private IgniteBiTuple currVal; @@ -111,6 +119,26 @@ public IndexQueryResult queryLocal( continue; } + if (isRecordable) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Index query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.INDEX.name(), + cctx.name(), + idxQryDesc.valType(), + null, + filter, + null, + null, + securitySubjectId(cctx), + cctx.kernalContext().task().resolveTaskName(taskHash), + k, + v, + null, + null)); + } + currVal = new IgniteBiTuple<>(k, v); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java index fa0f3df0428f5..979e771a0f11f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java @@ -43,5 +43,8 @@ public enum CacheQueryType { CONTINUOUS, /** SPI query. */ - SPI + SPI, + + /** Index query. */ + INDEX } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 64d6b9a5aa42c..cbe315d0731bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -629,13 +629,29 @@ private QueryResult executeQuery(GridCacheQueryAdapter qry, @Nullable O break; case INDEX: + if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + cctx.gridEvents().record(new CacheQueryExecutedEvent<>( + cctx.localNode(), + "Index query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.INDEX.name(), + cctx.name(), + qry.queryClassName(), + null, + qry.scanFilter(), + null, + null, + securitySubjectId(cctx), + taskName)); + } + int[] parts = null; if (qry.partition() != null) parts = new int[]{qry.partition()}; IndexQueryResult idxQryRes = qryProc.queryIndex(cacheName, qry.queryClassName(), qry.idxQryDesc(), - qry.scanFilter(), filter(qry, parts, parts != null), qry.keepBinary()); + qry.scanFilter(), filter(qry, parts, parts != null), qry.keepBinary(), qry.taskHash()); iter = idxQryRes.iter(); res.metadata(idxQryRes.metadata()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index da378a5833be6..3db44e0239cf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -3657,6 +3657,7 @@ public GridCloseableIterator> queryText(final String * @param entryFilter Optional user defined cache entries filter. * @param cacheFilter Ignite specific cache entries filters. * @param keepBinary Keep binary flag. + * @param taskHash Hashcode of the task. * @return Key/value rows. * @throws IgniteCheckedException If failed. */ @@ -3666,7 +3667,8 @@ public IndexQueryResult queryIndex( final IndexQueryDesc idxQryDesc, @Nullable IgniteBiPredicate entryFilter, final IndexingQueryFilter cacheFilter, - boolean keepBinary + boolean keepBinary, + int taskHash ) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); @@ -3678,7 +3680,7 @@ public IndexQueryResult queryIndex( new IgniteOutClosureX>() { @Override public IndexQueryResult applyx() throws IgniteCheckedException { try { - return idxQryPrc.queryLocal(cctx, idxQryDesc, entryFilter, cacheFilter, keepBinary); + return idxQryPrc.queryLocal(cctx, idxQryDesc, entryFilter, cacheFilter, keepBinary, taskHash); } catch (IgniteCheckedException e) { String msg = "Failed to execute IndexQuery: " + e.getMessage() + ". Query desc: " + idxQryDesc; diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index e13a6b2ed3209..8918f094664fa 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.IndexQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -80,6 +81,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -101,10 +103,13 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt; +import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION; import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.FULL_TEXT; +import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.INDEX; import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.SCAN; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause; import static org.junit.Assert.assertArrayEquals; @@ -1883,6 +1888,116 @@ public void testFieldsQueryEvents() throws Exception { } } + /** + * @throws Exception If failed. + */ + @Test + public void testIndexQueryEvents() throws Exception { + final Map qryResults = new ConcurrentHashMap<>(); + final IgniteCache cache = jcache(Integer.class, Type2.class); + final boolean evtsDisabled = cache.getConfiguration(CacheConfiguration.class).isEventsDisabled(); + + final CountDownLatch readLatch = new CountDownLatch(evtsDisabled ? 0 : 2); + final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 : + cacheMode() == REPLICATED ? 1 : gridCount()); + + IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()]; + IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()]; + + for (int i = 0; i < gridCount(); i++) { + IgnitePredicate objReadPred = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + assert evt instanceof CacheQueryReadEvent; + + if (evtsDisabled) + fail("Cache events are disabled"); + + CacheQueryReadEvent qe = (CacheQueryReadEvent)evt; + + assertEquals(INDEX.name(), qe.queryType()); + assertEquals(cache.getName(), qe.cacheName()); + assertEquals("Type2", QueryUtils.typeName(qe.className())); + assertNotNull(qe.scanQueryFilter()); + assertNull(qe.clause()); + assertNull(qe.continuousQueryFilter()); + assertNull(qe.arguments()); + + qryResults.put(qe.key(), (BinaryObject)qe.value()); + + readLatch.countDown(); + + return true; + } + }; + + grid(i).events().localListen(objReadPred, EVT_CACHE_QUERY_OBJECT_READ); + objReadLsnrs[i] = objReadPred; + + IgnitePredicate qryExecPred = new IgnitePredicate() { + @Override public boolean apply(Event evt) { + assert evt instanceof CacheQueryExecutedEvent; + + if (evtsDisabled) + fail("Cache events are disabled"); + + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; + + assertEquals(INDEX.name(), qe.queryType()); + assertEquals(cache.getName(), qe.cacheName()); + assertEquals("Type2", QueryUtils.typeName(qe.className())); + assertNotNull(qe.scanQueryFilter()); + assertNull(qe.clause()); + assertNull(qe.continuousQueryFilter()); + assertNull(qe.arguments()); + + execLatch.countDown(); + + return true; + } + }; + + grid(i).events().localListen(qryExecPred, EVT_CACHE_QUERY_EXECUTED); + qryExecLsnrs[i] = qryExecPred; + } + + try { + cache.put(1, new Type2(1, "John")); + cache.put(2, new Type2(2, "Bill")); + cache.put(3, new Type2(3, "Sam")); + cache.put(4, new Type2(4, "Bill")); + cache.put(5, new Type2(5, "Bob")); + + IndexQuery qry = new IndexQuery(Type2.class) + .setCriteria(gt("id", 1), lt("id", 5)) + .setFilter((k, v) -> v.name().contains("Bill")); + + if (cacheMode() == REPLICATED) + qry.setLocal(true); + + QueryCursor> cursor = cache.query(qry); + + cursor.getAll(); + + assert readLatch.await(1000, MILLISECONDS); + assert execLatch.await(1000, MILLISECONDS); + + if (!evtsDisabled) { + assertEquals(2, qryResults.size()); + + assertEquals("Bill", ((Type2)qryResults.get(2).deserialize()).name()); + assertEquals("Bill", ((Type2)qryResults.get(4).deserialize()).name()); + } + else + assert qryResults.isEmpty(); + } + finally { + for (int i = 0; i < gridCount(); i++) { + grid(i).events().stopLocalListen(objReadLsnrs[i]); + grid(i).events().stopLocalListen(qryExecLsnrs[i]); + } + } + } + /** * @throws Exception If failed. */