diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java index 868253f5a3b..2e3356084e4 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItFunctionsTest.java @@ -169,10 +169,25 @@ public void testRange() { assertEquals(0, sql("SELECT * FROM table(system_range(null, 1))").size()); + assertEquals(0, sql("SELECT * FROM table(system_range(1, null))").size()); + assertThrowsSqlException( Sql.RUNTIME_ERR, "Increment can't be 0", () -> sql("SELECT * FROM table(system_range(1, 1, 0))")); + + assertQuery("SELECT (SELECT * FROM table(system_range(4, 1)))") + .returns(null) + .check(); + + assertQuery("SELECT (SELECT * FROM table(system_range(1, 1)))") + .returns(1L) + .check(); + + assertThrowsSqlException( + Sql.RUNTIME_ERR, + "Subquery returned more than 1 value", + () -> sql("SELECT (SELECT * FROM table(system_range(1, 10)))")); } @Test diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 1f5b373c736..f19fbe26099 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -90,6 +90,7 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider; @@ -323,6 +324,8 @@ public synchronized CompletableFuture startAsync() { tableManager, schemaManager, sqlSchemaManager, replicaService, clockService, TABLE_CACHE_SIZE ); + var tableFunctionRegistry = new TableFunctionRegistryImpl(); + var dependencyResolver = new ExecutionDependencyResolverImpl( executableTableRegistry, view -> () -> systemViewManager.scanView(view.name()) @@ -380,6 +383,7 @@ public CompletableFuture forSystemView(ExecutionTargetFactory f mappingService, executableTableRegistry, dependencyResolver, + tableFunctionRegistry, clockService, EXECUTION_SERVICE_SHUTDOWN_TIMEOUT )); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index 21e109ec521..8936c00b1cd 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback; import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment; @@ -210,6 +211,7 @@ public ExecutionServiceImpl( /** * Creates the execution services. * + * @param Type of the sql row. * @param topSrvc Topology service. * @param msgSrvc Message service. * @param sqlSchemaManager Schema manager. @@ -218,7 +220,10 @@ public ExecutionServiceImpl( * @param handler Row handler. * @param mailboxRegistry Mailbox registry. * @param exchangeSrvc Exchange service. - * @param Type of the sql row. + * @param mappingService Nodes mapping calculation service. + * @param tableRegistry Table registry. + * @param dependencyResolver Dependency resolver. + * @param tableFunctionRegistry Table function registry. * @return An execution service. */ public static ExecutionServiceImpl create( @@ -233,6 +238,7 @@ public static ExecutionServiceImpl create( MappingService mappingService, ExecutableTableRegistry tableRegistry, ExecutionDependencyResolver dependencyResolver, + TableFunctionRegistry tableFunctionRegistry, ClockService clockService, long shutdownTimeout ) { @@ -250,7 +256,9 @@ public static ExecutionServiceImpl create( ctx, mailboxRegistry, exchangeSrvc, - deps), + deps, + tableFunctionRegistry + ), clockService, shutdownTimeout ); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index 4a09b90dc6f..96893bae045 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -38,6 +38,7 @@ import org.apache.calcite.rel.core.Minus; import org.apache.calcite.rel.core.Spool; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; @@ -51,6 +52,8 @@ import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper; import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry; import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup; import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode; import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNode; @@ -147,6 +150,8 @@ public class LogicalRelImplementor implements IgniteRelVisitor> private final ResolvedDependencies resolvedDependencies; + private final TableFunctionRegistry tableFunctionRegistry; + /** * Constructor. * @@ -154,16 +159,20 @@ public class LogicalRelImplementor implements IgniteRelVisitor> * @param mailboxRegistry Mailbox registry. * @param exchangeSvc Exchange service. * @param resolvedDependencies Dependencies required to execute this query. + * @param tableFunctionRegistry Table function registry. */ public LogicalRelImplementor( ExecutionContext ctx, MailboxRegistry mailboxRegistry, ExchangeService exchangeSvc, - ResolvedDependencies resolvedDependencies) { + ResolvedDependencies resolvedDependencies, + TableFunctionRegistry tableFunctionRegistry + ) { this.mailboxRegistry = mailboxRegistry; this.exchangeSvc = exchangeSvc; this.ctx = ctx; this.resolvedDependencies = resolvedDependencies; + this.tableFunctionRegistry = tableFunctionRegistry; expressionFactory = ctx.expressionFactory(); destinationFactory = new DestinationFactory<>(ctx.rowHandler(), resolvedDependencies); @@ -667,14 +676,9 @@ public Node visit(IgniteSetOp rel) { /** {@inheritDoc} */ @Override public Node visit(IgniteTableFunctionScan rel) { - Supplier> dataSupplier = expressionFactory.execute(rel.getCall()); - - RelDataType rowType = rel.getRowType(); - - RowSchema rowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType)); - RowFactory rowFactory = ctx.rowHandler().factory(rowSchema); + TableFunction tableFunction = tableFunctionRegistry.getTableFunction(ctx, (RexCall) rel.getCall()); - return new ScanNode<>(ctx, new TableFunctionScan<>(dataSupplier, rowFactory)); + return new ScanNode<>(ctx, tableFunction); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java deleted file mode 100644 index 45c1da50c88..00000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TableFunctionScan.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.sql.engine.exec; - -import java.util.Iterator; -import java.util.function.Supplier; -import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; -import org.apache.ignite.internal.util.TransformingIterator; - -/** - * TableFunctionScan. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ -public class TableFunctionScan implements Iterable { - private final Supplier> dataSupplier; - - private final RowFactory rowFactory; - - /** - * Constructor. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 - */ - public TableFunctionScan( - Supplier> dataSupplier, - RowFactory rowFactory - ) { - this.dataSupplier = dataSupplier; - this.rowFactory = rowFactory; - } - - /** {@inheritDoc} */ - @Override - public Iterator iterator() { - return new TransformingIterator<>(dataSupplier.get().iterator(), rowFactory::create); - } -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java index bdff76032c2..7aa41bb3b19 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/IgniteSqlFunctions.java @@ -37,20 +37,8 @@ import org.apache.calcite.DataContext; import org.apache.calcite.avatica.util.ByteString; import org.apache.calcite.avatica.util.DateTimeUtils; -import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.linq4j.AbstractEnumerable; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.linq4j.function.NonDeterministic; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable; @@ -86,16 +74,6 @@ private IgniteSqlFunctions() { // No-op. } - /** SQL SYSTEM_RANGE(start, end) table function. */ - public static ScannableTable systemRange(Object rangeStart, Object rangeEnd) { - return new RangeTable(rangeStart, rangeEnd, 1L); - } - - /** SQL SYSTEM_RANGE(start, end, increment) table function. */ - public static ScannableTable systemRange(Object rangeStart, Object rangeEnd, Object increment) { - return new RangeTable(rangeStart, rangeEnd, increment); - } - /** Just a stub. Validates Date\Time literal, still use calcite implementation for numeric representation. * Otherwise need to fix {@code DateTimeUtils#unixTimestampToString} usage additionally. */ @@ -659,120 +637,6 @@ private static void unixTimeToString(IgniteStringBuilder buf, int time, int prec } } - /** - * Dummy table to implement the SYSTEM_RANGE function. - */ - private static class RangeTable implements ScannableTable { - /** Start of the range. */ - private final Object rangeStart; - - /** End of the range. */ - private final Object rangeEnd; - - /** Increment. */ - private final Object increment; - - /** - * Note: {@code Object} arguments required here due to: 1. {@code NULL} arguments need to be supported, so we - * can't use {@code long} arguments type. 2. {@code Integer} and other numeric classes can be converted to - * {@code long} type by java, but can't be converted to {@code Long} type, so we can't use {@code Long} - * arguments type either. Instead, we accept {@code Object} arguments type and try to convert valid types to - * {@code long}. - */ - RangeTable(Object rangeStart, Object rangeEnd, Object increment) { - this.rangeStart = rangeStart; - this.rangeEnd = rangeEnd; - this.increment = increment; - } - - /** {@inheritDoc} */ - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return typeFactory.builder().add("X", SqlTypeName.BIGINT).build(); - } - - /** {@inheritDoc} */ - @Override - public Enumerable<@Nullable Object[]> scan(DataContext root) { - if (rangeStart == null || rangeEnd == null || increment == null) { - return Linq4j.emptyEnumerable(); - } - - long rangeStart = convertToLongArg(this.rangeStart, "rangeStart"); - long rangeEnd = convertToLongArg(this.rangeEnd, "rangeEnd"); - long increment = convertToLongArg(this.increment, "increment"); - - if (increment == 0L) { - throw new IllegalArgumentException("Increment can't be 0"); - } - - return new AbstractEnumerable<>() { - @Override - public Enumerator<@Nullable Object[]> enumerator() { - return new Enumerator<>() { - long cur = rangeStart - increment; - - @Override - public Object[] current() { - return new Object[]{cur}; - } - - @Override - public boolean moveNext() { - cur += increment; - - return increment > 0L ? cur <= rangeEnd : cur >= rangeEnd; - } - - @Override - public void reset() { - cur = rangeStart - increment; - } - - @Override - public void close() { - // No-op. - } - }; - } - }; - } - - private long convertToLongArg(Object val, String name) { - if (val instanceof Byte || val instanceof Short || val instanceof Integer || val instanceof Long) { - return ((Number) val).longValue(); - } - - throw new IllegalArgumentException("Unsupported argument type [arg=" + name - + ", type=" + val.getClass().getSimpleName() + ']'); - } - - /** {@inheritDoc} */ - @Override - public Statistic getStatistic() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override - public Schema.TableType getJdbcTableType() { - return Schema.TableType.TABLE; - } - - /** {@inheritDoc} */ - @Override - public boolean isRolledUp(String column) { - return false; - } - - /** {@inheritDoc} */ - @Override - public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, - SqlNode parent, CalciteConnectionConfig cfg) { - return true; - } - } - private static long divide(long p, long q, RoundingMode mode) { // Stripped down version of guava's LongMath::divide. diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java index 334446bbea3..ef62472443b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RexImpTable.java @@ -333,7 +333,6 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.USER; import static org.apache.calcite.util.ReflectUtil.isStatic; import static org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.SUBSTR; -import static org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.SYSTEM_RANGE; import static org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable.TYPEOF; import static org.apache.ignite.internal.sql.engine.util.IgniteMethod.GEN_RANDOM_UUID; import static org.apache.ignite.internal.sql.engine.util.IgniteMethod.GREATEST2; @@ -1020,7 +1019,6 @@ Builder populateIgnite() { defineMethod(IgniteSqlOperatorTable.DECIMAL_DIVIDE, IgniteMethod.DECIMAL_DIVIDE.method(), NullPolicy.ARG0); map.put(TYPEOF, systemFunctionImplementor); - map.put(SYSTEM_RANGE, systemFunctionImplementor); return this; } @@ -1034,17 +1032,7 @@ private static class IgniteSystemFunctionImplementor Expression implementSafe(final RexToLixTranslator translator, final RexCall call, final List argValueList) { final SqlOperator op = call.getOperator(); - if (op == SYSTEM_RANGE) { - if (call.getOperands().size() == 2) { - return createTableFunctionImplementor(IgniteMethod.SYSTEM_RANGE2.method()) - .implement(translator, call, NullAs.NULL); - } - - if (call.getOperands().size() == 3) { - return createTableFunctionImplementor(IgniteMethod.SYSTEM_RANGE3.method()) - .implement(translator, call, NullAs.NULL); - } - } else if (op == TYPEOF) { + if (op == TYPEOF) { if (call.getOperands().size() == 1) { CallImplementor implementor = createTypeOfImplementor(); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java new file mode 100644 index 00000000000..3e38287fd67 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/IterableTableFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import java.util.Iterator; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; + +/** + * {@link TableFunction} from an arbitrary iterable. + * + * @param Row type. + */ +public final class IterableTableFunction implements TableFunction { + + private final Iterable src; + + /** Constructor. */ + public IterableTableFunction(Iterable src) { + this.src = src; + } + + /** {@inheritDoc} */ + @Override + public TableFunctionInstance createInstance(ExecutionContext ctx) { + return new TableFunctionInstance<>() { + + private final Iterator it = src.iterator(); + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public RowT next() { + return it.next(); + } + + @Override + public void close() { + + } + }; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java new file mode 100644 index 00000000000..908ab5cab9f --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/SystemRangeTableFunction.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Supplier; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; +import org.apache.ignite.internal.sql.engine.exec.row.RowSchema; +import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable; +import org.apache.ignite.internal.type.NativeTypes; +import org.jetbrains.annotations.Nullable; + +/** Implementation of {@link IgniteSqlOperatorTable#SYSTEM_RANGE system range function}. */ +public final class SystemRangeTableFunction implements TableFunction { + + private final RowSchema rowSchema = RowSchema.builder() + .addField(NativeTypes.INT64) + .build(); + + private final Supplier startExpr; + + private final Supplier endExpr; + + private final Supplier incrementExpr; + + /** Constructor. */ + public SystemRangeTableFunction(Supplier startExpr, Supplier endExpr, @Nullable Supplier incrementExpr) { + this.startExpr = Objects.requireNonNull(startExpr, "startExpr"); + this.endExpr = Objects.requireNonNull(endExpr, "endExpr"); + this.incrementExpr = incrementExpr != null ? incrementExpr : () -> 1L; + } + + /** {@inheritDoc} */ + @Override + public TableFunctionInstance createInstance(ExecutionContext ctx) { + RowFactory factory = ctx.rowHandler().factory(rowSchema); + + Long start = startExpr.get(); + Long end = endExpr.get(); + Long increment = incrementExpr.get(); + + if (increment == null) { + increment = 1L; + } else if (increment == 0) { + throw new IllegalArgumentException("Increment can't be 0"); + } + + if (start == null || end == null) { + return TableFunctionInstance.empty(); + } else { + return new SystemRangeInstance<>(factory, start, end, increment); + } + } + + private static class SystemRangeInstance implements TableFunctionInstance { + + private final long end; + + private final long increment; + + private long current; + + private final RowFactory factory; + + SystemRangeInstance(RowFactory factory, long start, long end, long increment) { + this.factory = factory; + this.end = end; + this.increment = increment; + this.current = start; + } + + @Override + public boolean hasNext() { + return increment > 0 ? current <= end : current >= end; + } + + @Override + public RowT next() { + if (increment > 0 && current > end) { + throw new NoSuchElementException(); + } else if (increment < 0 && current < end) { + throw new NoSuchElementException(); + } + + RowT row = factory.create(current); + + current += increment; + + return row; + } + + @Override + public void close() { + + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java new file mode 100644 index 00000000000..cc1d53cf564 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; + +/** + * Table function implementation. + * + * @param Row type. + */ +public interface TableFunction { + + /** Creates an instance of a table function. */ + TableFunctionInstance createInstance(ExecutionContext ctx); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java new file mode 100644 index 00000000000..b3d9af9e5b8 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionInstance.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * Represents an instance of a table function. + * + * @param Row type. + */ +public interface TableFunctionInstance extends Iterator, AutoCloseable { + + /** Returns a table function instance that always produces no results. */ + static TableFunctionInstance empty() { + return new TableFunctionInstance<>() { + @Override + public void close() { + + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public RowT next() { + throw new NoSuchElementException(); + } + }; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java new file mode 100644 index 00000000000..7edf0309b98 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistry.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import org.apache.calcite.rex.RexCall; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; + +/** Registry of available table functions. */ +public interface TableFunctionRegistry { + + /** + * Returns implementation of the given table function call. + * + * @param Row type. + * @param ctx Context. + * @param rexCall Table function call. + * @return Table function implementation. + */ + TableFunction getTableFunction(ExecutionContext ctx, RexCall rexCall); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java new file mode 100644 index 00000000000..4f6edc75b09 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/func/TableFunctionRegistryImpl.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.exp.func; + +import java.util.function.Supplier; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.exp.ExpressionFactory; +import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable; +import org.jetbrains.annotations.Nullable; + +/** Implementation of {@link TableFunctionRegistry}. */ +public class TableFunctionRegistryImpl implements TableFunctionRegistry { + + /** {@inheritDoc} */ + @Override + public TableFunction getTableFunction(ExecutionContext ctx, RexCall rexCall) { + if (rexCall.getOperator() == IgniteSqlOperatorTable.SYSTEM_RANGE) { + Supplier start = implementGetLongExpr(ctx.expressionFactory(), rexCall.operands.get(0)); + Supplier end = implementGetLongExpr(ctx.expressionFactory(), rexCall.operands.get(1)); + Supplier increment; + + if (rexCall.operands.size() > 2) { + increment = implementGetLongExpr(ctx.expressionFactory(), rexCall.operands.get(2)); + } else { + increment = null; + } + + return new SystemRangeTableFunction<>(start, end, increment); + } else { + throw new IllegalArgumentException("Unsupported table function: " + rexCall.getOperator()); + } + } + + private static @Nullable Supplier implementGetLongExpr(ExpressionFactory expressionFactory, RexNode expr) { + if (expr == null) { + return null; + } + + Supplier value = expressionFactory.execute(expr); + return () -> { + Number num = (Number) value.get(); + if (num == null) { + return null; + } + return num.longValue(); + }; + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java index 59c8b607b32..f8a2a3e6607 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java @@ -17,34 +17,47 @@ package org.apache.ignite.internal.sql.engine.exec.rel; -import java.util.Iterator; import java.util.List; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.exp.func.IterableTableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance; import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.lang.ErrorGroups.Sql; +import org.apache.ignite.sql.SqlException; /** * Scan node. */ public class ScanNode extends AbstractNode implements SingleNode { - private final Iterable src; + private final TableFunction func; - private Iterator it; + private TableFunctionInstance inst; private int requested; private boolean inLoop; /** - * Constructor. - * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 + * Constructor for a scan that returns rows from the given iterable. * * @param ctx Execution context. - * @param src Source. + * @param src Source iterable. */ public ScanNode(ExecutionContext ctx, Iterable src) { + this(ctx, new IterableTableFunction<>(src)); + } + + /** + * Constructor for a scan that returns rows produced by the given table function. + * + * @param ctx Execution context. + * @param src Table function. + */ + public ScanNode(ExecutionContext ctx, TableFunction src) { super(ctx); - this.src = src; + this.func = src; } /** {@inheritDoc} */ @@ -66,16 +79,16 @@ public void request(int rowsCnt) throws Exception { public void closeInternal() { super.closeInternal(); - Commons.closeQuiet(it); - it = null; - Commons.closeQuiet(src); + Commons.closeQuiet(inst); + inst = null; + Commons.closeQuiet(func); } /** {@inheritDoc} */ @Override protected void rewindInternal() { - Commons.closeQuiet(it); - it = null; + Commons.closeQuiet(inst); + inst = null; requested = 0; } @@ -100,16 +113,16 @@ private void push() throws Exception { inLoop = true; try { - if (it == null) { - it = src.iterator(); + if (inst == null) { + inst = func.createInstance(context()); } int processed = 0; - while (requested > 0 && it.hasNext()) { + while (requested > 0 && inst.hasNext()) { checkState(); requested--; - downstream().push(it.next()); + downstream().push(inst.next()); if (++processed == inBufSize && requested > 0) { // allow others to do their job @@ -118,13 +131,15 @@ private void push() throws Exception { return; } } + } catch (Exception e) { + throw new SqlException(Sql.RUNTIME_ERR, e); } finally { inLoop = false; } - if (requested > 0 && !it.hasNext()) { - Commons.closeQuiet(it); - it = null; + if (requested > 0 && !inst.hasNext()) { + Commons.closeQuiet(inst); + inst = null; requested = 0; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java index 926509f62d7..6ed1ff9d43e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java @@ -68,10 +68,6 @@ public enum IgniteMethod { /** See {@link BiScalar#execute(ExecutionContext, Object, Object, RowBuilder)}. */ BI_SCALAR_EXECUTE(BiScalar.class, "execute", ExecutionContext.class, Object.class, Object.class, RowBuilder.class), - SYSTEM_RANGE2(IgniteSqlFunctions.class, "systemRange", Object.class, Object.class), - - SYSTEM_RANGE3(IgniteSqlFunctions.class, "systemRange", Object.class, Object.class, Object.class), - STRING_TO_TIMESTAMP(IgniteSqlFunctions.class, "timestampStringToNumeric", String.class), /** See {@link IgniteSqlFunctions#subtractTimeZoneOffset(long, TimeZone)}. **/ diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index 5c5e1700410..98a3de4d513 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -92,6 +92,8 @@ import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider; @@ -902,6 +904,7 @@ public CompletableFuture forSystemView(ExecutionTargetFactory f var partitionPruner = new PartitionPrunerImpl(); var mappingService = new MappingServiceImpl(nodeName, targetProvider, EmptyCacheFactory.INSTANCE, 0, partitionPruner, taskExecutor); + var tableFunctionRegistry = new TableFunctionRegistryImpl(); List logicalNodes = nodeNames.stream() .map(name -> new LogicalNode(name, name, NetworkAddress.from("127.0.0.1:10000"))) @@ -919,7 +922,7 @@ public CompletableFuture forSystemView(ExecutionTargetFactory f ArrayRowHandler.INSTANCE, executableTableRegistry, dependencyResolver, - (ctx, deps) -> node.implementor(ctx, mailboxRegistry, exchangeService, deps), + (ctx, deps) -> node.implementor(ctx, mailboxRegistry, exchangeService, deps, tableFunctionRegistry), clockService, SHUTDOWN_TIMEOUT ); @@ -1090,8 +1093,10 @@ public LogicalRelImplementor implementor( ExecutionContext ctx, MailboxRegistry mailboxRegistry, ExchangeService exchangeService, - ResolvedDependencies deps) { - return new LogicalRelImplementor<>(ctx, mailboxRegistry, exchangeService, deps) { + ResolvedDependencies deps, + TableFunctionRegistry tableFunctionRegistry + ) { + return new LogicalRelImplementor<>(ctx, mailboxRegistry, exchangeService, deps, tableFunctionRegistry) { @Override public Node visit(IgniteTableScan rel) { return new ScanNode<>(ctx, dataset) { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java new file mode 100644 index 00000000000..1f95f13d66e --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNodeExecutionTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link ScanNode}. + */ +public class ScanNodeExecutionTest extends AbstractExecutionTest { + + @Test + public void testIterableSource() { + ExecutionContext ctx = executionContext(true); + + List data = IntStream.range(0, 5) + .mapToObj(i -> new Object[]{i}) + .collect(Collectors.toList()); + + RootNode rootNode = new RootNode<>(ctx); + ScanNode srcNode = new ScanNode<>(ctx, data); + + rootNode.register(srcNode); + + collectResults(rootNode, data); + } + + @Test + public void testTableFunctionSource() { + ExecutionContext ctx = executionContext(true); + + List data = IntStream.range(0, 5) + .mapToObj(i -> new Object[]{i}) + .collect(Collectors.toList()); + + TestFunctionInstance instance = new TestFunctionInstance<>(data.iterator()); + TestFunction testFunction = new TestFunction<>(instance); + + try (RootNode rootNode = new RootNode<>(ctx)) { + ScanNode srcNode = new ScanNode<>(ctx, testFunction); + + rootNode.register(srcNode); + + collectResults(rootNode, data); + + assertEquals(1, instance.closeCounter.get()); + } + } + + @Override + protected RowHandler rowHandler() { + return ArrayRowHandler.INSTANCE; + } + + private static void collectResults(RootNode rootNode, List data) { + List actual = new ArrayList<>(); + + while (rootNode.hasNext()) { + actual.add(rootNode.next()); + } + + assertEquals( + actual.stream().map(Arrays::asList).collect(Collectors.toList()), + data.stream().map(Arrays::asList).collect(Collectors.toList()) + ); + } + + private static class TestFunction implements TableFunction { + + private final TableFunctionInstance instance; + + private TestFunction(TableFunctionInstance instance) { + this.instance = instance; + } + + @Override + public TableFunctionInstance createInstance(ExecutionContext ctx) { + return instance; + } + } + + private static class TestFunctionInstance implements TableFunctionInstance { + final Iterator it; + + final AtomicInteger closeCounter = new AtomicInteger(); + + private TestFunctionInstance(Iterator it) { + this.it = it; + } + + @Override + public void close() { + closeCounter.incrementAndGet(); + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public RowT next() { + return it.next(); + } + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java new file mode 100644 index 00000000000..519d8fa4b68 --- /dev/null +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/SystemRangeTableFunctionTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; +import org.apache.ignite.internal.sql.engine.exec.RowHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.SystemRangeTableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionInstance; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl; +import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; +import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable; +import org.apache.ignite.internal.sql.engine.util.Commons; +import org.junit.jupiter.api.Test; + +/** Tests for {@link SystemRangeTableFunction}. */ +public class SystemRangeTableFunctionTest extends AbstractExecutionTest { + + private final TableFunctionRegistry registry = new TableFunctionRegistryImpl(); + + private final RexBuilder rexBuilder = Commons.rexBuilder(); + + @Test + public void testSystemRangeNoIncrement() throws Exception { + RexCall call = (RexCall) rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE, + rexBuilder.makeBigintLiteral(BigDecimal.ONE), + rexBuilder.makeBigintLiteral(BigDecimal.TEN) + ); + + List expected = IntStream.rangeClosed(1, 10) + .mapToObj(i -> new Object[]{i}) + .collect(Collectors.toList()); + + checkFunction(call, expected); + } + + @Test + public void testSystemRangeWithIncrement() throws Exception { + RexCall call = (RexCall) rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE, + rexBuilder.makeBigintLiteral(BigDecimal.ONE), + rexBuilder.makeBigintLiteral(BigDecimal.TEN), + rexBuilder.makeBigintLiteral(BigDecimal.ONE.add(BigDecimal.ONE)) + ); + + List expected = IntStream.rangeClosed(1, 10) + .filter(i -> i % 2 == 1) + .mapToObj(i -> new Object[]{i}) + .collect(Collectors.toList()); + + checkFunction(call, expected); + } + + @Test + public void testSystemRangeReverse() throws Exception { + RexCall call = (RexCall) rexBuilder.makeCall(IgniteSqlOperatorTable.SYSTEM_RANGE, + rexBuilder.makeBigintLiteral(BigDecimal.TEN), + rexBuilder.makeBigintLiteral(BigDecimal.ONE), + rexBuilder.makeBigintLiteral(BigDecimal.ONE.negate()) + ); + + List expected = IntStream.rangeClosed(1, 10) + .boxed() + .sorted(Comparator.reverseOrder()) + .map(i -> new Object[]{i}) + .collect(Collectors.toList()); + + checkFunction(call, expected); + } + + private void checkFunction(RexCall call, List expected) throws Exception { + ExecutionContext executionContext = executionContext(); + TableFunction tableFunction = registry.getTableFunction(executionContext, call); + + try (TableFunctionInstance instance = tableFunction.createInstance(executionContext)) { + List actual = new ArrayList<>(); + while (instance.hasNext()) { + actual.add(instance.next()); + } + + assertEquals(expected.size(), actual.size()); + } + } + + @Override + protected RowHandler rowHandler() { + return ArrayRowHandler.INSTANCE; + } +} diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java index 130f5c7e928..77652b2df44 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl; import org.apache.ignite.internal.sql.engine.exec.RowHandler; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl; import org.apache.ignite.internal.sql.engine.exec.mapping.MappingService; import org.apache.ignite.internal.sql.engine.message.MessageService; import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl; @@ -138,6 +139,8 @@ public class TestNode implements LifecycleAware { tableRegistry, view -> () -> systemViewManager.scanView(view.name()) ); + TableFunctionRegistryImpl tableFunctionRegistry = new TableFunctionRegistryImpl(); + executionService = registerService(ExecutionServiceImpl.create( topologyService, messageService, @@ -150,6 +153,7 @@ public class TestNode implements LifecycleAware { mappingService, tableRegistry, dependencyResolver, + tableFunctionRegistry, clockService, 5_000 ));