Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-20294: Sql. Using UDF as a place for system_range function #3729

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,25 @@ public void testRange() {

assertEquals(0, sql("SELECT * FROM table(system_range(null, 1))").size());

assertEquals(0, sql("SELECT * FROM table(system_range(1, null))").size());

assertThrowsSqlException(
Sql.RUNTIME_ERR,
"Increment can't be 0",
() -> sql("SELECT * FROM table(system_range(1, 1, 0))"));

assertQuery("SELECT (SELECT * FROM table(system_range(4, 1)))")
.returns(null)
.check();

assertQuery("SELECT (SELECT * FROM table(system_range(1, 1)))")
.returns(1L)
.check();

assertThrowsSqlException(
Sql.RUNTIME_ERR,
"Subquery returned more than 1 value",
() -> sql("SELECT (SELECT * FROM table(system_range(1, 10)))"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetProvider;
Expand Down Expand Up @@ -323,6 +324,8 @@ public synchronized CompletableFuture<Void> startAsync() {
tableManager, schemaManager, sqlSchemaManager, replicaService, clockService, TABLE_CACHE_SIZE
);

var tableFunctionRegistry = new TableFunctionRegistryImpl();

var dependencyResolver = new ExecutionDependencyResolverImpl(
executableTableRegistry,
view -> () -> systemViewManager.scanView(view.name())
Expand Down Expand Up @@ -380,6 +383,7 @@ public CompletableFuture<ExecutionTarget> forSystemView(ExecutionTargetFactory f
mappingService,
executableTableRegistry,
dependencyResolver,
tableFunctionRegistry,
clockService,
EXECUTION_SERVICE_SHUTDOWN_TIMEOUT
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappedFragment;
Expand Down Expand Up @@ -210,6 +211,7 @@ public ExecutionServiceImpl(
/**
* Creates the execution services.
*
* @param <RowT> Type of the sql row.
* @param topSrvc Topology service.
* @param msgSrvc Message service.
* @param sqlSchemaManager Schema manager.
Expand All @@ -218,7 +220,10 @@ public ExecutionServiceImpl(
* @param handler Row handler.
* @param mailboxRegistry Mailbox registry.
* @param exchangeSrvc Exchange service.
* @param <RowT> Type of the sql row.
* @param mappingService Nodes mapping calculation service.
* @param tableRegistry Table registry.
* @param dependencyResolver Dependency resolver.
* @param tableFunctionRegistry Table function registry.
* @return An execution service.
*/
public static <RowT> ExecutionServiceImpl<RowT> create(
Expand All @@ -233,6 +238,7 @@ public static <RowT> ExecutionServiceImpl<RowT> create(
MappingService mappingService,
ExecutableTableRegistry tableRegistry,
ExecutionDependencyResolver dependencyResolver,
TableFunctionRegistry tableFunctionRegistry,
ClockService clockService,
long shutdownTimeout
) {
Expand All @@ -250,7 +256,9 @@ public static <RowT> ExecutionServiceImpl<RowT> create(
ctx,
mailboxRegistry,
exchangeSrvc,
deps),
deps,
tableFunctionRegistry
),
clockService,
shutdownTimeout
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Spool;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
Expand All @@ -51,6 +52,8 @@
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunction;
import org.apache.ignite.internal.sql.engine.exec.exp.func.TableFunctionRegistry;
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
import org.apache.ignite.internal.sql.engine.exec.rel.AbstractSetOpNode;
import org.apache.ignite.internal.sql.engine.exec.rel.CorrelatedNestedLoopJoinNode;
Expand Down Expand Up @@ -147,23 +150,29 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>

private final ResolvedDependencies resolvedDependencies;

private final TableFunctionRegistry tableFunctionRegistry;

/**
* Constructor.
*
* @param ctx Root context.
* @param mailboxRegistry Mailbox registry.
* @param exchangeSvc Exchange service.
* @param resolvedDependencies Dependencies required to execute this query.
* @param tableFunctionRegistry Table function registry.
*/
public LogicalRelImplementor(
ExecutionContext<RowT> ctx,
MailboxRegistry mailboxRegistry,
ExchangeService exchangeSvc,
ResolvedDependencies resolvedDependencies) {
ResolvedDependencies resolvedDependencies,
TableFunctionRegistry tableFunctionRegistry
) {
this.mailboxRegistry = mailboxRegistry;
this.exchangeSvc = exchangeSvc;
this.ctx = ctx;
this.resolvedDependencies = resolvedDependencies;
this.tableFunctionRegistry = tableFunctionRegistry;

expressionFactory = ctx.expressionFactory();
destinationFactory = new DestinationFactory<>(ctx.rowHandler(), resolvedDependencies);
Expand Down Expand Up @@ -667,14 +676,9 @@ public Node<RowT> visit(IgniteSetOp rel) {
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteTableFunctionScan rel) {
Supplier<Iterable<Object[]>> dataSupplier = expressionFactory.execute(rel.getCall());

RelDataType rowType = rel.getRowType();

RowSchema rowSchema = rowSchemaFromRelTypes(RelOptUtil.getFieldTypeList(rowType));
RowFactory<RowT> rowFactory = ctx.rowHandler().factory(rowSchema);
TableFunction<RowT> tableFunction = tableFunctionRegistry.getTableFunction(ctx, (RexCall) rel.getCall());

return new ScanNode<>(ctx, new TableFunctionScan<>(dataSupplier, rowFactory));
return new ScanNode<>(ctx, tableFunction);
}

/** {@inheritDoc} */
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,8 @@
import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.linq4j.function.NonDeterministic;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.runtime.SqlFunctions;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.sql.fun.IgniteSqlOperatorTable;
Expand Down Expand Up @@ -86,16 +74,6 @@ private IgniteSqlFunctions() {
// No-op.
}

/** SQL SYSTEM_RANGE(start, end) table function. */
public static ScannableTable systemRange(Object rangeStart, Object rangeEnd) {
return new RangeTable(rangeStart, rangeEnd, 1L);
}

/** SQL SYSTEM_RANGE(start, end, increment) table function. */
public static ScannableTable systemRange(Object rangeStart, Object rangeEnd, Object increment) {
return new RangeTable(rangeStart, rangeEnd, increment);
}

/** Just a stub. Validates Date\Time literal, still use calcite implementation for numeric representation.
* Otherwise need to fix {@code DateTimeUtils#unixTimestampToString} usage additionally.
*/
Expand Down Expand Up @@ -659,120 +637,6 @@ private static void unixTimeToString(IgniteStringBuilder buf, int time, int prec
}
}

/**
* Dummy table to implement the SYSTEM_RANGE function.
*/
private static class RangeTable implements ScannableTable {
/** Start of the range. */
private final Object rangeStart;

/** End of the range. */
private final Object rangeEnd;

/** Increment. */
private final Object increment;

/**
* Note: {@code Object} arguments required here due to: 1. {@code NULL} arguments need to be supported, so we
* can't use {@code long} arguments type. 2. {@code Integer} and other numeric classes can be converted to
* {@code long} type by java, but can't be converted to {@code Long} type, so we can't use {@code Long}
* arguments type either. Instead, we accept {@code Object} arguments type and try to convert valid types to
* {@code long}.
*/
RangeTable(Object rangeStart, Object rangeEnd, Object increment) {
this.rangeStart = rangeStart;
this.rangeEnd = rangeEnd;
this.increment = increment;
}

/** {@inheritDoc} */
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.builder().add("X", SqlTypeName.BIGINT).build();
}

/** {@inheritDoc} */
@Override
public Enumerable<@Nullable Object[]> scan(DataContext root) {
if (rangeStart == null || rangeEnd == null || increment == null) {
return Linq4j.emptyEnumerable();
}

long rangeStart = convertToLongArg(this.rangeStart, "rangeStart");
long rangeEnd = convertToLongArg(this.rangeEnd, "rangeEnd");
long increment = convertToLongArg(this.increment, "increment");

if (increment == 0L) {
throw new IllegalArgumentException("Increment can't be 0");
}

return new AbstractEnumerable<>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
return new Enumerator<>() {
long cur = rangeStart - increment;

@Override
public Object[] current() {
return new Object[]{cur};
}

@Override
public boolean moveNext() {
cur += increment;

return increment > 0L ? cur <= rangeEnd : cur >= rangeEnd;
}

@Override
public void reset() {
cur = rangeStart - increment;
}

@Override
public void close() {
// No-op.
}
};
}
};
}

private long convertToLongArg(Object val, String name) {
if (val instanceof Byte || val instanceof Short || val instanceof Integer || val instanceof Long) {
return ((Number) val).longValue();
}

throw new IllegalArgumentException("Unsupported argument type [arg=" + name
+ ", type=" + val.getClass().getSimpleName() + ']');
}

/** {@inheritDoc} */
@Override
public Statistic getStatistic() {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public Schema.TableType getJdbcTableType() {
return Schema.TableType.TABLE;
}

/** {@inheritDoc} */
@Override
public boolean isRolledUp(String column) {
return false;
}

/** {@inheritDoc} */
@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call,
SqlNode parent, CalciteConnectionConfig cfg) {
return true;
}
}

private static long divide(long p, long q, RoundingMode mode) {
// Stripped down version of guava's LongMath::divide.

Expand Down