Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/_docs/monitoring-metrics/new-metrics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,29 @@ Register name: `cache`
|LastDataVer| long | The latest data version on the node.
|DataVersionClusterId| integer | Data version cluster id.
|===

== SQL parser metrics

Register name: `sql.parser.cache`

[cols="2,1,3",opts="header"]
|===
|Name| Type| Description
|hits| long | The number of SQL queries that were found in the parsers cache (doesn't require to be parsed and planned before execution).
|misses| long | The number of SQL queries that were parsed and planned.
|===

== SQL executor metrics

Register name: `sql.queries.user`

[cols="2,1,3",opts="header"]
|===
|Name| Type| Description
|success| long | The number of succesfully executed SQL queries.
|failed| long | The number of failed SQL queries (including canceled).
|canceled| long | The number of canceled SQL queries.
|===



Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.calcite.DataContexts;
Expand All @@ -49,6 +50,7 @@
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.events.SqlQueryExecutionEvent;
Expand All @@ -60,6 +62,7 @@
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
Expand Down Expand Up @@ -177,6 +180,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
/** */
private final QueryPlanCache qryPlanCache;

/** */
private final QueryParserMetricsHolder parserMetrics;

/** */
private final QueryTaskExecutor taskExecutor;

Expand Down Expand Up @@ -225,6 +231,7 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
failureProcessor = ctx.failure();
schemaHolder = new SchemaHolderImpl(ctx);
qryPlanCache = new QueryPlanCacheImpl(ctx);
parserMetrics = new QueryParserMetricsHolder(ctx.metric());
mailboxRegistry = new MailboxRegistryImpl(ctx);
taskExecutor = new QueryTaskExecutorImpl(ctx);
executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE);
Expand Down Expand Up @@ -422,9 +429,18 @@ public ExecutionService<Object[]> executionService() {

@Override public QueryPlan apply(RootQuery<Object[]> qry, Object[] params) {
if (plan == null) {
plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params), () ->
prepareSvc.prepareSingle(qryNode, qry.planningContext())
);
AtomicBoolean miss = new AtomicBoolean();

plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params), () -> {
miss.set(true);

return prepareSvc.prepareSingle(qryNode, qry.planningContext());
});

if (miss.get())
parserMetrics.countCacheMiss();
else
parserMetrics.countCacheHit();
}

return plan;
Expand Down Expand Up @@ -457,11 +473,15 @@ private <T> List<T> parseAndProcessQuery(
QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params));

if (plan != null) {
parserMetrics.countCacheHit();

return Collections.singletonList(
processQuery(qryCtx, qry -> action.apply(qry, plan), schema.getName(), plan.query(), null, params)
);
}

parserMetrics.countCacheMiss();

SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());

List<T> res = new ArrayList<>(qryList.size());
Expand Down Expand Up @@ -555,12 +575,17 @@ private <T> T processQuery(
if (qrys != null)
qrys.forEach(RootQuery::cancel);

qryReg.unregister(qry.id(), e);
if (isCanceled) {
qryReg.unregister(qry.id(), new QueryCancelledException());

throw new IgniteSQLException("The query was cancelled while planning",
IgniteQueryErrorCode.QUERY_CANCELED, e);
}
else {
qryReg.unregister(qry.id(), e);

if (isCanceled)
throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
else
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
Expand Down Expand Up @@ -165,9 +165,9 @@ public void cancel() {
}

for (RunningFragment<RowT> frag : fragments)
frag.context().execute(() -> frag.root().onError(new ExecutionCancelledException()), frag.root()::onError);
frag.context().execute(() -> frag.root().onError(new QueryCancelledException()), frag.root()::onError);

tryClose(new ExecutionCancelledException());
tryClose(queryCanceledException());
}

/** */
Expand All @@ -176,13 +176,8 @@ public void addFragment(RunningFragment<RowT> f) {
if (state == QueryState.INITED)
state = QueryState.EXECUTING;

if (state == QueryState.CLOSING || state == QueryState.CLOSED) {
throw new IgniteSQLException(
"The query was cancelled",
IgniteQueryErrorCode.QUERY_CANCELED,
new ExecutionCancelledException()
);
}
if (state == QueryState.CLOSING || state == QueryState.CLOSED)
throw queryCanceledException();

fragments.add(f);
}
Expand All @@ -193,6 +188,15 @@ public boolean isCancelled() {
return cancel.isCanceled();
}

/** */
protected IgniteSQLException queryCanceledException() {
return new IgniteSQLException(
"The query was cancelled",
IgniteQueryErrorCode.QUERY_CANCELED,
new QueryCancelledException()
);
}

/** */
public void onNodeLeft(UUID nodeId) {
if (initNodeId.equals(nodeId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
Expand Down Expand Up @@ -174,12 +173,8 @@ public Object[] parameters() {
*/
public void mapping() {
synchronized (mux) {
if (state == QueryState.CLOSED) {
throw new IgniteSQLException(
"The query was cancelled while executing.",
IgniteQueryErrorCode.QUERY_CANCELED
);
}
if (state == QueryState.CLOSED)
throw queryCanceledException();

state = QueryState.MAPPING;
}
Expand All @@ -190,12 +185,8 @@ public void mapping() {
*/
public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root) {
synchronized (mux) {
if (state == QueryState.CLOSED) {
throw new IgniteSQLException(
"The query was cancelled while executing.",
IgniteQueryErrorCode.QUERY_CANCELED
);
}
if (state == QueryState.CLOSED)
throw queryCanceledException();

planningTime = U.currentTimeMillis() - startTs;

Expand Down Expand Up @@ -278,7 +269,7 @@ public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root)
log.warning("An exception occures during the query cancel", wrpEx);
}
finally {
super.tryClose(failure);
super.tryClose(failure == null && root != null ? root.failure() : failure);
}
}
}
Expand All @@ -287,18 +278,15 @@ public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT> root)
@Override public void cancel() {
cancel.cancel();

tryClose(new ExecutionCancelledException());
U.closeQuiet(root);
tryClose(queryCanceledException());
}

/** */
public PlanningContext planningContext() {
synchronized (mux) {
if (state == QueryState.CLOSED || state == QueryState.CLOSING) {
throw new IgniteSQLException(
"The query was cancelled while executing.",
IgniteQueryErrorCode.QUERY_CANCELED
);
}
if (state == QueryState.CLOSED || state == QueryState.CLOSING)
throw queryCanceledException();

if (state == QueryState.EXECUTING || state == QueryState.MAPPING) {
throw new IgniteSQLException(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryReadEvent;
Expand Down Expand Up @@ -833,7 +834,7 @@ private void onMessage(UUID nodeId, ErrorMessage msg) {

Exception e = new RemoteException(nodeId, msg.queryId(), msg.fragmentId(), msg.error());

if (X.hasCause(msg.error(), ExecutionCancelledException.class)) {
if (X.hasCause(msg.error(), QueryCancelledException.class)) {
e = new IgniteSQLException(
"The query was cancelled while executing.",
IgniteQueryErrorCode.QUERY_CANCELED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -146,7 +146,7 @@ protected void rowType(RelDataType rowType) {
* @param e Exception.
*/
public void onError(Throwable e) {
if (e instanceof ExecutionCancelledException)
if (e instanceof QueryCancelledException)
U.warn(context().logger(), "Execution is cancelled.", e);
else
onErrorInternal(e);
Expand Down Expand Up @@ -184,7 +184,7 @@ protected boolean isClosed() {
/** */
protected void checkState() throws Exception {
if (context().isCancelled())
throw new ExecutionCancelledException();
throw new QueryCancelledException();
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was interrupted.");
if (!U.assertionsEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@

import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;

Expand Down Expand Up @@ -114,8 +116,10 @@ public UUID queryId() {

lock.lock();
try {
if (waiting != -1)
ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED));
if (waiting != -1) {
ex.compareAndSet(null, new IgniteSQLException(ERR_MSG, IgniteQueryErrorCode.QUERY_CANCELED,
new QueryCancelledException()));
}

closed = true; // an exception has to be set first to get right check order

Expand All @@ -128,6 +132,11 @@ public UUID queryId() {
onClose.accept(ex.get());
}

/** */
public @Nullable Throwable failure() {
return ex.get();
}

/** {@inheritDoc} */
@Override protected boolean isClosed() {
return closed;
Expand Down
Loading