diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml
index b546ecaa42a7a..3b30977fad519 100644
--- a/modules/calcite/pom.xml
+++ b/modules/calcite/pom.xml
@@ -98,6 +98,12 @@
${spring.version}
test
+
+ org.apache.ignite
+ ignite-clients
+ 2.9.0-SNAPSHOT
+ test
+
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index fc876088066d5..27a092a709130 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite;
-import java.util.Collections;
import java.util.List;
import org.apache.calcite.config.Lex;
import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.sql.fun.SqlLibrary;
import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
import org.apache.calcite.sql.parser.SqlParser;
@@ -66,6 +66,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Lexical configuration defines how identifiers are quoted, whether they are converted to upper or lower
// case when they are read, and whether identifiers are matched case-sensitively.
.setLex(Lex.ORACLE)
+// .setParserFactory(SqlDdlParserImpl.FACTORY) // Enables DDL support
.build())
// Dialects support.
.operatorTable(SqlLibraryOperatorTableFactory.INSTANCE
@@ -75,7 +76,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
// Context provides a way to store data within the planner session that can be accessed in planner rules.
.context(Contexts.empty())
// Custom cost factory to use during optimization
- .costFactory(null)
+ .costFactory(RelOptCostImpl.FACTORY)
.typeSystem(IgniteTypeSystem.INSTANCE)
.build();
@@ -251,8 +252,7 @@ public FailureProcessor failureProcessor() {
@Override public List>> query(@Nullable QueryContext qryCtx, @Nullable String schemaName,
String query, Object... params) throws IgniteSQLException {
- // TODO multiline query.
- return Collections.singletonList(executionService.executeQuery(qryCtx, schemaName, query, params));
+ return executionService.executeQuery(qryCtx, schemaName, query, params);
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
new file mode 100644
index 0000000000000..d3910c9f936ca
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ClosableIteratorsHolder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ClosableIteratorsHolder {
+ /** */
+ private final ReferenceQueue refQueue;
+
+ /** */
+ private final Map refMap;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private volatile boolean stopped;
+
+ /** */
+ private Thread cleanWorker;
+
+ /** */
+ public ClosableIteratorsHolder(IgniteLogger log) {
+ this.log = log;
+
+ refQueue = new ReferenceQueue<>();
+ refMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * @param src Closeable iterator.
+ * @return Weak closable iterator wrapper.
+ */
+ public Iterator iterator(final Iterator src) {
+ cleanUp(false);
+
+ return new DelegatingIterator<>(src);
+ }
+
+ /** */
+ public void init() {
+ cleanWorker = new Thread(() -> cleanUp(true));
+ cleanWorker.setDaemon(true);
+ cleanWorker.start();
+ }
+
+ /** */
+ public void tearDown() {
+ stopped = true;
+ refMap.clear();
+ U.interrupt(cleanWorker);
+ }
+
+ /** */
+ private void cleanUp(boolean blocking) {
+ for (Reference> ref = nextRef(blocking); !stopped && ref != null; ref = nextRef(blocking))
+ Commons.close(refMap.remove(ref), log);
+ }
+
+ /** */
+ private Reference nextRef(boolean blocking) {
+ try {
+ return !blocking ? refQueue.poll() : refQueue.remove();
+ }
+ catch (InterruptedException ignored) {
+ return null;
+ }
+ }
+
+ /** */
+ private AutoCloseable closeable(Object referent, Object resource) {
+ if (!(resource instanceof AutoCloseable))
+ return null;
+
+ return new CloseableReference(referent, resource);
+ }
+
+ /** */
+ private final class DelegatingIterator implements Iterator, AutoCloseable {
+ /** */
+ private final Iterator delegate;
+
+ /** */
+ private final AutoCloseable closeable;
+
+ /** */
+ private DelegatingIterator(Iterator delegate) {
+ closeable = closeable(this, this.delegate = delegate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public T next() {
+ return delegate.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ delegate.remove();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void forEachRemaining(Consumer super T> action) {
+ delegate.forEachRemaining(action);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ Commons.close(closeable);
+ }
+ }
+
+ /** */
+ private final class CloseableReference extends WeakReference implements AutoCloseable {
+ /** */
+ private CloseableReference(Object referent, Object resource) {
+ super(referent, refQueue);
+
+ refMap.put(this, resource);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ try {
+ Commons.close(refMap.remove(this));
+ }
+ finally {
+ clear();
+ }
+ }
+ }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
index a686866e94fbf..19a474c83e308 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ConsumerNode.java
@@ -134,8 +134,6 @@ private void requestInternal() {
/** {@inheritDoc} */
@Override public void cancel() {
- context().setCancelled();
-
if (state != State.RUNNING)
return;
@@ -144,6 +142,7 @@ private void requestInternal() {
if (state != State.RUNNING)
return;
+ context().setCancelled();
state = State.CANCELLED;
buff.clear();
cond.signalAll();
@@ -155,8 +154,11 @@ private void requestInternal() {
context().execute(input()::cancel);
onClose.accept(this);
}
-
- public boolean canceled() {
+
+ /**
+ * @return Cancelled flag.
+ */
+ boolean canceled() {
return state == State.CANCELLED;
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 24b53838b64ba..4f9a05e9d511e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -42,7 +42,7 @@ public interface ExchangeService extends Service {
* @param batchId Batch ID.
* @param rows Data rows.
*/
- void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List> rows);
+ void sendBatch(Outbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List> rows);
/**
* Acknowledges a batch with given ID is processed.
@@ -54,7 +54,7 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
- void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+ void acknowledge(Inbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
/**
* Sends cancel request.
@@ -66,5 +66,5 @@ public interface ExchangeService extends Service {
* @param exchangeId Exchange ID.
* @param batchId Batch ID.
*/
- void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
+ void cancel(Outbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 7559f069c5ee7..b94940bc18ef0 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.message.InboxCancelMessage;
@@ -95,18 +96,32 @@ public MessageService messageService() {
}
/** {@inheritDoc} */
- @Override public void sendBatch(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List> rows) {
- messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+ @Override public void sendBatch(Outbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId, List> rows) {
+ try {
+ messageService().send(nodeId, new QueryBatchMessage(queryId, fragmentId, exchangeId, batchId, rows));
+ }
+ catch (IgniteCheckedException e) {
+ caller.cancel();
+ }
}
/** {@inheritDoc} */
- @Override public void acknowledge(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
- messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+ @Override public void acknowledge(Inbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ try {
+ messageService().send(nodeId, new QueryBatchAcknowledgeMessage(queryId, fragmentId, exchangeId, batchId));
+ }
+ catch (IgniteCheckedException e) {
+ caller.cancel();
+ }
}
/** {@inheritDoc} */
- @Override public void cancel(Object caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
- messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+ @Override public void cancel(Outbox> caller, UUID nodeId, UUID queryId, long fragmentId, long exchangeId, int batchId) {
+ try {
+ messageService().send(nodeId, new InboxCancelMessage(queryId, fragmentId, exchangeId, batchId));
+ }
+ catch (IgniteCheckedException ignored) {
+ }
}
/** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
index 0b4177a75833b..e01f78b77ade2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
@@ -99,7 +99,7 @@ public int[] partitions() {
* @return Keep binary flag.
*/
public boolean keepBinary() {
- return false; // TODO
+ return true; // TODO
}
/**
@@ -116,6 +116,13 @@ public boolean cancelled() {
return cancelled;
}
+ /**
+ * @return Originating node ID.
+ */
+ public UUID originatingNodeId() {
+ return parent().originatingNodeId();
+ }
+
/** {@inheritDoc} */
@Override public SchemaPlus getRootSchema() {
return ctx.schema();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
index 16afe41b2613b..59ba0a802c8aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionService.java
@@ -37,7 +37,7 @@ public interface ExecutionService extends Service {
* @param params Query parameters.
* @return Query cursor.
*/
- FieldsQueryCursor> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
+ List>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params);
/**
* Cancels a running query.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index e4e1fd5b2c1ea..81168835af6cd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -27,19 +29,25 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -49,6 +57,8 @@
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryCancellable;
import org.apache.ignite.internal.processors.query.QueryContext;
@@ -58,15 +68,22 @@
import org.apache.ignite.internal.processors.query.calcite.message.QueryCancelRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartResponse;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
import org.apache.ignite.internal.processors.query.calcite.metadata.PartitionService;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
+import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlanImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
-import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ValidationResult;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
@@ -75,9 +92,6 @@
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelGraph;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.RelToGraphConverter;
import org.apache.ignite.internal.processors.query.calcite.serialize.relation.SenderNode;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
-import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
-import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
@@ -132,6 +146,9 @@ public class ExecutionServiceImpl extends AbstractService implements ExecutionSe
/** */
private ExchangeService exchangeService;
+ /** */
+ private ClosableIteratorsHolder iteratorsHolder;
+
/** */
private final Map running;
@@ -313,86 +330,36 @@ public void exchangeManager(GridCachePartitionExchangeManager,?> exchangeManag
return exchangeManager;
}
- /** {@inheritDoc} */
- @Override public FieldsQueryCursor> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
- UUID queryId = UUID.randomUUID();
-
- PlanningContext pctx = createContext(ctx, schema, query, params);
-
- QueryPlan plan = prepare(pctx);
-
- // Local execution
- Fragment local = F.first(plan.fragments());
-
- if (U.assertionsEnabled()) {
- assert local != null;
-
- NodesMapping mapping = local.mapping();
-
- assert mapping != null;
+ /**
+ * @param iteratorsHolder Iterators holder.
+ */
+ public void iteratorsHolder(ClosableIteratorsHolder iteratorsHolder) {
+ this.iteratorsHolder = iteratorsHolder;
+ }
- List nodes = mapping.nodes();
+ /**
+ * @return Iterators holder.
+ */
+ public ClosableIteratorsHolder iteratorsHolder() {
+ return iteratorsHolder;
+ }
- assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(pctx.localNodeId());
+ /** {@inheritDoc} */
+ @Override public List>> executeQuery(@Nullable QueryContext ctx, String schema, String query, Object[] params) {
+ PlanningContext pctx = createContext(ctx, schema, query, params);
+ RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(IgniteMetadata.METADATA_PROVIDER));
+ try (IgnitePlanner ignored = pctx.planner()) {
+ return Commons.transform(prepare(pctx), p -> executeSingle(UUID.randomUUID(), pctx, p));
}
-
- ExecutionContext ectx = new ExecutionContext(
- taskExecutor(), pctx, queryId, local.fragmentId(), local.mapping().partitions(pctx.localNodeId()), Commons.parametersMap(params)
- );
-
- Node