Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20454 Sql. Add a query prefetch callback that is notified when data prefetching is complete. #2674

Merged
merged 9 commits into from
Oct 19, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine;

import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
import org.jetbrains.annotations.Nullable;

/**
* Callback that is invoked when the query finishes prefetching. It is designed
* to allow sequential execution of SQL statements that are dependent on each other.
*
* <ol>
* <li>For {@code DML} queries, it is called after the cursor has finished prefetching
* the initial batch of rows (see {@link AsyncRootNode#startPrefetch}).</li>
* <li>For {@code DDL} queries, it is called after the corresponding DDL
* command has completed (see {@link DdlCommandHandler#handle(DdlCommand)}.</li>
* </ol>
*
* <p>This callback is invoked asynchronously in the "{@code execution pool}".
*/
@FunctionalInterface
public interface QueryPrefetchCallback {
/**
* Called when the query finishes prefetching.
*
* @param ex Exceptional completion cause, or {@code null} if prefetch completed successfully.
*/
void onPrefetchComplete(@Nullable Throwable ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.NodeLeftException;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
Expand Down Expand Up @@ -268,9 +269,9 @@ public AsyncCursor<List<Object>> executePlan(
case QUERY:
return executeQuery(tx, ctx, (MultiStepPlan) plan);
case EXPLAIN:
return executeExplain((ExplainPlan) plan);
return executeExplain((ExplainPlan) plan, ctx.prefetchCallback());
case DDL:
return executeDdl((DdlPlan) plan);
return executeDdl((DdlPlan) plan, ctx.prefetchCallback());

default:
throw new AssertionError("Unexpected query type: " + plan);
Expand All @@ -288,13 +289,17 @@ public CompletableFuture<?> cancel(UUID qryId) {
return mgr.close(true);
}

private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) {
private AsyncCursor<List<Object>> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) {
CompletableFuture<Iterator<List<Object>>> ret = ddlCmdHnd.handle(plan.command())
.thenApply(applied -> List.of(List.<Object>of(applied)).iterator())
.exceptionally(th -> {
throw convertDdlException(th);
});

if (callback != null) {
ret.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
}

return new AsyncWrapper<>(ret, Runnable::run);
}

Expand All @@ -315,9 +320,13 @@ private static RuntimeException convertDdlException(Throwable e) {
return (e instanceof RuntimeException) ? (RuntimeException) e : new IgniteInternalException(INTERNAL_ERR, e);
}

private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) {
private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan, @Nullable QueryPrefetchCallback callback) {
List<List<Object>> res = List.of(List.of(plan.plan()));

if (callback != null) {
taskExecutor.execute(() -> callback.onPrefetchComplete(null));
}

return new AsyncWrapper<>(res.iterator());
}

Expand Down Expand Up @@ -555,7 +564,12 @@ private CompletableFuture<Void> executeFragment(IgniteRel treeRoot, ResolvedDepe
});
node.onRegister(rootNode);

rootNode.prefetch();
CompletableFuture<Void> prefetchFut = rootNode.startPrefetch();
QueryPrefetchCallback callback = ctx.prefetchCallback();

if (callback != null) {
prefetchFut.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor);
}

root.complete(rootNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.CursorClosedException;
import org.jetbrains.annotations.Nullable;

/**
* An async iterator over the execution tree.
Expand All @@ -56,6 +57,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async

private final Queue<PendingRequest<OutRowT>> pendingRequests = new ConcurrentLinkedQueue<>();

private final CompletableFuture<Void> prefetchFut = new CompletableFuture<>();

private volatile boolean closed = false;

/**
Expand Down Expand Up @@ -169,6 +172,8 @@ public CompletableFuture<Void> closeAsync() {
}
}, source::onError);

completePrefetchFuture(th);

closed = true;
}
}
Expand All @@ -181,8 +186,10 @@ public CompletableFuture<Void> closeAsync() {
* Starts the execution of the fragment and keeps the result in the intermediate buffer.
*
* <p>Note: this method must be called by the same thread that will execute the whole fragment.
*
* @return Future representing pending completion of the prefetch operation.
*/
public void prefetch() {
public CompletableFuture<Void> startPrefetch() {
assert source.context().description().prefetch();

if (waiting == 0) {
Expand All @@ -192,9 +199,13 @@ public void prefetch() {
onError(ex);
}
}

return prefetchFut;
}

private void flush() throws Exception {
completePrefetchFuture(null);

// flush may be triggered by prefetching, so let's do nothing in this case
if (pendingRequests.isEmpty()) {
return;
Expand Down Expand Up @@ -234,6 +245,21 @@ private void scheduleTask() {
}
}

/**
* Completes prefetch future if it has not already been completed.
*
* @param ex Exceptional completion cause or {@code null} if the future must complete successfully.
*/
private void completePrefetchFuture(@Nullable Throwable ex) {
if (!prefetchFut.isDone()) {
if (ex != null) {
prefetchFut.completeExceptionally(ex);
} else {
prefetchFut.complete(null);
}
}
}

private static class PendingRequest<OutRowT> {
/**
* A future to complete when {@link #buff buffer} will be filled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancel;
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
Expand Down Expand Up @@ -152,6 +153,8 @@ public List<MetadataHandler<?>> handlers(Class<? extends MetadataHandler<?>> hnd

private final Object[] parameters;

private final QueryPrefetchCallback prefetchCallback;

private CalciteCatalogReader catalogReader;

/**
Expand All @@ -162,7 +165,8 @@ private BaseQueryContext(
FrameworkConfig cfg,
QueryCancel cancel,
Object[] parameters,
IgniteLogger log
IgniteLogger log,
QueryPrefetchCallback prefetchCallback
) {
super(Contexts.chain(cfg.getContext()));

Expand All @@ -173,6 +177,7 @@ private BaseQueryContext(
this.log = log;
this.cancel = cancel;
this.parameters = parameters;
this.prefetchCallback = prefetchCallback;

typeFactory = TYPE_FACTORY;

Expand Down Expand Up @@ -225,6 +230,10 @@ public int schemaVersion() {
return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version();
}

public QueryPrefetchCallback prefetchCallback() {
return prefetchCallback;
}

/**
* Returns calcite catalog reader.
*/
Expand Down Expand Up @@ -285,6 +294,8 @@ public static class Builder {

private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY;

private QueryPrefetchCallback prefetchCallback;

public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
this.frameworkCfg = Objects.requireNonNull(frameworkCfg);
return this;
Expand All @@ -305,13 +316,18 @@ public Builder queryId(UUID queryId) {
return this;
}

public Builder prefetchCallback(QueryPrefetchCallback prefetchCallback) {
this.prefetchCallback = prefetchCallback;
return this;
}

public Builder parameters(Object... parameters) {
this.parameters = Objects.requireNonNull(parameters);
return this;
}

public BaseQueryContext build() {
return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log);
return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, prefetchCallback);
}
}

Expand Down
Loading