Skip to content

Commit

Permalink
ignite-1127 Query with result size more then one page doesn't increas…
Browse files Browse the repository at this point in the history
…e Query executions count metric - Fixes #23.

Signed-off-by: S.Vladykin <svladykin@gridgain.com>
  • Loading branch information
agura authored and S.Vladykin committed Sep 9, 2015
1 parent 6771638 commit 2311de4
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 265 deletions.
Expand Up @@ -70,6 +70,7 @@
import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI1;
Expand Down Expand Up @@ -439,7 +440,8 @@ public GridCacheGateway<K, V> gate() {
* @return Cursor. * @return Cursor.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private QueryCursor<Cache.Entry<K,V>> query(Query filter, @Nullable ClusterGroup grp) { private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp)
throws IgniteCheckedException {
final CacheQuery<Map.Entry<K,V>> qry; final CacheQuery<Map.Entry<K,V>> qry;
final CacheQueryFuture<Map.Entry<K,V>> fut; final CacheQueryFuture<Map.Entry<K,V>> fut;


Expand All @@ -454,7 +456,12 @@ private QueryCursor<Cache.Entry<K,V>> query(Query filter, @Nullable ClusterGroup
if (grp != null) if (grp != null)
qry.projection(grp); qry.projection(grp);


fut = qry.execute(); fut = ctx.kernalContext().query().executeQuery(ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
return qry.execute();
}
}, false);
} }
else if (filter instanceof TextQuery) { else if (filter instanceof TextQuery) {
TextQuery p = (TextQuery)filter; TextQuery p = (TextQuery)filter;
Expand All @@ -464,15 +471,25 @@ else if (filter instanceof TextQuery) {
if (grp != null) if (grp != null)
qry.projection(grp); qry.projection(grp);


fut = qry.execute(); fut = ctx.kernalContext().query().executeQuery(ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
return qry.execute();
}
}, false);
} }
else if (filter instanceof SpiQuery) { else if (filter instanceof SpiQuery) {
qry = ctx.queries().createSpiQuery(isKeepPortable); qry = ctx.queries().createSpiQuery(isKeepPortable);


if (grp != null) if (grp != null)
qry.projection(grp); qry.projection(grp);


fut = qry.execute(((SpiQuery)filter).getArgs()); fut = ctx.kernalContext().query().executeQuery(ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
return qry.execute(((SpiQuery)filter).getArgs());
}
}, false);
} }
else { else {
if (filter instanceof SqlFieldsQuery) if (filter instanceof SqlFieldsQuery)
Expand Down Expand Up @@ -619,7 +636,7 @@ private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, bool
} }
catch (Exception e) { catch (Exception e) {
if (e instanceof CacheException) if (e instanceof CacheException)
throw e; throw (CacheException)e;


throw new CacheException(e); throw new CacheException(e);
} }
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
protected GridCacheLocalQueryFuture(GridCacheContext<K, V> ctx, GridCacheQueryBean qry) { protected GridCacheLocalQueryFuture(GridCacheContext<K, V> ctx, GridCacheQueryBean qry) {
super(ctx, qry, true); super(ctx, qry, true);


run = new LocalQueryRunnable<>(); run = new LocalQueryRunnable();
} }


/** /**
Expand Down Expand Up @@ -78,7 +78,7 @@ void execute() {
} }


/** */ /** */
private class LocalQueryRunnable<K, V, R> implements GridPlainRunnable { private class LocalQueryRunnable implements GridPlainRunnable {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void run() { @Override public void run() {
try { try {
Expand All @@ -101,7 +101,6 @@ private class LocalQueryRunnable<K, V, R> implements GridPlainRunnable {
* @return Query info. * @return Query info.
* @throws IgniteCheckedException In case of error. * @throws IgniteCheckedException In case of error.
*/ */
@SuppressWarnings({"unchecked"})
private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException { private GridCacheQueryInfo localQueryInfo() throws IgniteCheckedException {
GridCacheQueryBean qry = query(); GridCacheQueryBean qry = query();


Expand Down
Expand Up @@ -371,6 +371,7 @@ public ClusterGroup projection() {
/** /**
* @return Key-value filter. * @return Key-value filter.
*/ */
@SuppressWarnings("unchecked")
@Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() { @Nullable public <K, V> IgniteBiPredicate<K, V> scanFilter() {
return (IgniteBiPredicate<K, V>)filter; return (IgniteBiPredicate<K, V>)filter;
} }
Expand All @@ -396,8 +397,8 @@ public void validate() throws IgniteCheckedException {
* @param startTime Start time. * @param startTime Start time.
* @param duration Duration. * @param duration Duration.
*/ */
public void onExecuted(Object res, Throwable err, long startTime, long duration) { public void onCompleted(Object res, Throwable err, long startTime, long duration) {
GridQueryProcessor.onExecuted(cctx, metrics, res, err, startTime, duration, log); GridQueryProcessor.onCompleted(cctx, res, err, startTime, duration, log);
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down Expand Up @@ -431,7 +432,7 @@ public void onExecuted(Object res, Throwable err, long startTime, long duration)
* @param args Arguments. * @param args Arguments.
* @return Future. * @return Future.
*/ */
@SuppressWarnings("IfMayBeConditional") @SuppressWarnings({"IfMayBeConditional", "unchecked"})
private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer, private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer,
@Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) {
Collection<ClusterNode> nodes; Collection<ClusterNode> nodes;
Expand All @@ -440,13 +441,13 @@ private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer
nodes = nodes(); nodes = nodes();
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
return queryErrorFuture(cctx, e, log); return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
} }


cctx.checkSecurity(SecurityPermission.CACHE_READ); cctx.checkSecurity(SecurityPermission.CACHE_READ);


if (nodes.isEmpty()) if (nodes.isEmpty())
return queryErrorFuture(cctx, new ClusterGroupEmptyCheckedException(), log); return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']');
Expand All @@ -457,7 +458,7 @@ private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer
cctx.deploy().registerClasses(args); cctx.deploy().registerClasses(args);
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
return queryErrorFuture(cctx, e, log); return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
} }
} }


Expand Down Expand Up @@ -488,28 +489,30 @@ else if (type == SCAN && part != null && nodes.size() > 1)
private Collection<ClusterNode> nodes() throws IgniteCheckedException { private Collection<ClusterNode> nodes() throws IgniteCheckedException {
CacheMode cacheMode = cctx.config().getCacheMode(); CacheMode cacheMode = cctx.config().getCacheMode();


Integer part = partition();

switch (cacheMode) { switch (cacheMode) {
case LOCAL: case LOCAL:
if (prj != null) if (prj != null)
U.warn(log, "Ignoring query projection because it's executed over LOCAL cache " + U.warn(log, "Ignoring query projection because it's executed over LOCAL cache " +
"(only local node will be queried): " + this); "(only local node will be queried): " + this);


if (type == SCAN && cctx.config().getCacheMode() == LOCAL && if (type == SCAN && cctx.config().getCacheMode() == LOCAL &&
partition() != null && partition() >= cctx.affinity().partitions()) part != null && part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + partition()); throw new IgniteCheckedException("Invalid partition number: " + part);


return Collections.singletonList(cctx.localNode()); return Collections.singletonList(cctx.localNode());


case REPLICATED: case REPLICATED:
if (prj != null || partition() != null) if (prj != null || part != null)
return nodes(cctx, prj, partition()); return nodes(cctx, prj, part);


return cctx.affinityNode() ? return cctx.affinityNode() ?
Collections.singletonList(cctx.localNode()) : Collections.singletonList(cctx.localNode()) :
Collections.singletonList(F.rand(nodes(cctx, null, partition()))); Collections.singletonList(F.rand(nodes(cctx, null, null)));


case PARTITIONED: case PARTITIONED:
return nodes(cctx, prj, partition()); return nodes(cctx, prj, part);


default: default:
throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode);
Expand Down Expand Up @@ -537,29 +540,13 @@ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,


return F.view(affNodes, new P1<ClusterNode>() { return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) { @Override public boolean apply(ClusterNode n) {

return cctx.discovery().cacheAffinityNode(n, cctx.name()) && return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) && (prj == null || prj.node(n.id()) != null) &&
(part == null || owners.contains(n)); (part == null || owners.contains(n));
} }
}); });
} }


/**
* @param cctx Cache context.
* @param e Exception.
* @param log Logger.
*/
private static <T> GridCacheQueryErrorFuture<T> queryErrorFuture(GridCacheContext<?, ?> cctx,
Exception e, IgniteLogger log) {

GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();

GridQueryProcessor.onExecuted(cctx, metrics, null, e, 0, 0, log);

return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), e);
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public String toString() { @Override public String toString() {
return S.toString(GridCacheQueryAdapter.class, this); return S.toString(GridCacheQueryAdapter.class, this);
Expand Down
Expand Up @@ -155,7 +155,7 @@ boolean fields() {
@Override public boolean onDone(Collection<R> res, Throwable err) { @Override public boolean onDone(Collection<R> res, Throwable err) {
cctx.time().removeTimeoutObject(this); cctx.time().removeTimeoutObject(this);


qry.query().onExecuted(res, err, startTime(), duration()); qry.query().onCompleted(res, err, startTime(), duration());


return super.onDone(res, err); return super.onDone(res, err);
} }
Expand Down Expand Up @@ -413,11 +413,6 @@ public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, @Nullabl
} }
} }
} }
catch (Error e) {
onPageError(nodeId, e);

throw e;
}
catch (Throwable e) { catch (Throwable e) {
onPageError(nodeId, e); onPageError(nodeId, e);


Expand Down Expand Up @@ -446,6 +441,7 @@ private void onPageError(@Nullable UUID nodeId, Throwable e) {
* @param col Collection. * @param col Collection.
* @return Collection with masked {@code null} values. * @return Collection with masked {@code null} values.
*/ */
@SuppressWarnings("unchecked")
private Collection<Object> maskNulls(Collection<Object> col) { private Collection<Object> maskNulls(Collection<Object> col) {
assert col != null; assert col != null;


Expand All @@ -460,6 +456,7 @@ private Collection<Object> maskNulls(Collection<Object> col) {
* @param col Collection. * @param col Collection.
* @return Collection with unmasked {@code null} values. * @return Collection with unmasked {@code null} values.
*/ */
@SuppressWarnings("unchecked")
private Collection<Object> unmaskNulls(Collection<Object> col) { private Collection<Object> unmaskNulls(Collection<Object> col) {
assert col != null; assert col != null;


Expand Down
Expand Up @@ -1874,12 +1874,19 @@ public void resetMetrics() {
metrics = new GridCacheQueryMetricsAdapter(); metrics = new GridCacheQueryMetricsAdapter();
} }


/**
* @param fail {@code true} if execution failed.
*/
public void onExecuted(boolean fail) {
metrics.onQueryExecute(fail);
}

/** /**
* @param duration Execution duration. * @param duration Execution duration.
* @param fail {@code true} if execution failed. * @param fail {@code true} if execution failed.
*/ */
public void onMetricsUpdate(long duration, boolean fail) { public void onCompleted(long duration, boolean fail) {
metrics.onQueryExecute(duration, fail); metrics.onQueryCompleted(duration, fail);
} }


/** /**
Expand Down

0 comments on commit 2311de4

Please sign in to comment.