Skip to content
Permalink
Browse files
IGNITE-16486 Adoption of a bunch of tickets from Ignite-2 - Fixes #637.
IGNITE-13179 Fix traits at the IgniteLimit.
IGNITE-15603 NPE on subquery returning multiple results.
IGNITE-15982 Correlates passes through table spools.

Signed-off-by: zstan <stanilovsky@gmail.com>
  • Loading branch information
zstan committed Feb 28, 2022
1 parent 0d40c99 commit 2c3fcb2b39ea6c80993b4c9ec84655ebd36686da
Showing 8 changed files with 207 additions and 28 deletions.
@@ -112,12 +112,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.immutables</groupId>
<artifactId>value-annotations</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.npathai</groupId>
<artifactId>hamcrest-optional</artifactId>
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine;

import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;

import org.junit.jupiter.api.Test;

/** Tests for correlated queries. */
public class ItCorrelatesTest extends AbstractBasicIntegrationTest {
private static final String DISABLED_JOIN_RULES = " /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ ";

/** Checks correlates are assigned before access. */
@Test
public void testCorrelatesAssignedBeforeAccess() {
sql("create table test_tbl(k INTEGER primary key, v INTEGER)");
sql("INSERT INTO test_tbl VALUES (1, 1)");

assertQuery("SELECT " + DISABLED_JOIN_RULES + " t0.v, (SELECT t0.v + t1.v FROM test_tbl t1) AS j FROM test_tbl t0")
.matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
.returns(1, 2)
.check();
}

/** Checks that correlates can't be moved under the table spool. */
@Test
public void testCorrelatesWithTableSpool() {
sql("CREATE TABLE test(k INTEGER primary key, i1 INT, i2 INT)");
sql("INSERT INTO test VALUES (1, 1, 1), (2, 2, 2)");

assertQuery("SELECT " + DISABLED_JOIN_RULES + " (SELECT t1.i1 + t1.i2 + t0.i2 FROM test t1 WHERE i1 = 1) FROM test t0")
.matches(containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
.matches(containsSubPlan("IgniteTableSpool"))
.returns(3)
.returns(4)
.check();
}
}
@@ -183,7 +183,8 @@ protected boolean evalSubstringOf(String strIn) {
/**
* Adds plan matchers.
*/
public QueryChecker matches(Matcher<String>... planMatcher) {
@SafeVarargs
public final QueryChecker matches(Matcher<String>... planMatcher) {
Collections.addAll(planMatchers, planMatcher);

return this;
@@ -76,9 +76,15 @@ public void request(int rowsCnt) throws Exception {
rowsCnt = offset + rowsCnt;
}

waiting = rowsCnt;

if (fetch > 0) {
rowsCnt = Math.min(rowsCnt, (fetch + offset) - rowsProcessed);
}

checkState();

source().request(waiting = rowsCnt);
source().request(rowsCnt);
}

/** {@inheritDoc} */
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.rule;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,6 +46,7 @@
import org.apache.ignite.internal.sql.engine.trait.CorrelationTrait;
import org.apache.ignite.internal.sql.engine.trait.RewindabilityTrait;
import org.apache.ignite.internal.sql.engine.util.RexUtils;
import org.apache.ignite.internal.util.CollectionUtils;

/**
* LogicalScanConverterRule.
@@ -80,17 +82,18 @@ protected PhysicalNode convert(
collation = collation.apply(mapping);
}

RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
.replace(RewindabilityTrait.REWINDABLE)
.replace(distribution)
.replace(collation);

Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());

if (!corrIds.isEmpty()) {
traits = traits.replace(CorrelationTrait.correlations(corrIds));
if (!CollectionUtils.nullOrEmpty(rel.projects())) {
corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
}

RelTraitSet traits = rel.getCluster().traitSetOf(IgniteConvention.INSTANCE)
.replace(RewindabilityTrait.REWINDABLE)
.replace(distribution)
.replace(collation)
.replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));

return new IgniteIndexScan(
cluster,
traits,
@@ -127,15 +130,17 @@ protected PhysicalNode convert(
distribution = distribution.apply(mapping);
}

RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
.replace(RewindabilityTrait.REWINDABLE)
.replace(distribution);

Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
if (!corrIds.isEmpty()) {
traits = traits.replace(CorrelationTrait.correlations(corrIds));

if (!CollectionUtils.nullOrEmpty(rel.projects())) {
corrIds = new HashSet<>(CollectionUtils.union(corrIds, RexUtils.extractCorrelationIds(rel.projects())));
}

RelTraitSet traits = cluster.traitSetOf(IgniteConvention.INSTANCE)
.replace(RewindabilityTrait.REWINDABLE)
.replace(distribution)
.replace(corrIds.isEmpty() ? CorrelationTrait.UNCORRELATED : CorrelationTrait.correlations(corrIds));

return new IgniteTableScan(rel.getCluster(), traits,
rel.getTable(), rel.projects(), rel.condition(), rel.requiredColumns());
}
@@ -43,7 +43,7 @@ public String getSimpleName() {
/** {@inheritDoc} */
@Override
public RelNode convert(RelOptPlanner planner, RelNode rel, RewindabilityTrait toTrait, boolean allowInfiniteCostConverters) {
return TraitUtils.convertRewindability(planner, toTrait, rel);
return TraitUtils.convertRewindability(toTrait, rel);
}

/** {@inheritDoc} */
@@ -128,7 +128,7 @@ private static RelNode convertTrait(RelOptPlanner planner, RelTrait fromTrait, R
} else if (converter == DistributionTraitDef.INSTANCE) {
return convertDistribution(planner, (IgniteDistribution) toTrait, rel);
} else if (converter == RewindabilityTraitDef.INSTANCE) {
return convertRewindability(planner, (RewindabilityTrait) toTrait, rel);
return convertRewindability((RewindabilityTrait) toTrait, rel);
} else {
return convertOther(planner, converter, toTrait, rel);
}
@@ -203,18 +203,23 @@ public static RelNode convertDistribution(RelOptPlanner planner,
* Convert rewindability. TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@Nullable
public static RelNode convertRewindability(RelOptPlanner planner,
RewindabilityTrait toTrait, RelNode rel) {
public static RelNode convertRewindability(RewindabilityTrait toTrait, RelNode rel) {
RewindabilityTrait fromTrait = rewindability(rel);

if (fromTrait.satisfies(toTrait)) {
return rel;
}

RelTraitSet traits = rel.getTraitSet()
.replace(toTrait);

return new IgniteTableSpool(rel.getCluster(), traits, Spool.Type.LAZY, rel);
.replace(toTrait)
.replace(CorrelationTrait.UNCORRELATED);

return new IgniteTableSpool(
rel.getCluster(),
traits,
Spool.Type.LAZY,
RelOptRule.convert(rel, rel.getTraitSet().replace(CorrelationTrait.UNCORRELATED)))
;
}

@SuppressWarnings({"unchecked", "rawtypes"})
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine.exec.rel;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.junit.jupiter.api.Test;

/**
* Test LimitNode execution.
*/
public class LimitExecutionTest extends AbstractExecutionTest {
/** Tests correct results fetched with Limit node. */
@Test
public void testLimit() {
int bufSize = Commons.IN_BUFFER_SIZE;

checkLimit(0, 1);
checkLimit(1, 0);
checkLimit(1, 1);
checkLimit(0, bufSize);
checkLimit(bufSize, 0);
checkLimit(bufSize, bufSize);
checkLimit(bufSize - 1, 1);
checkLimit(2000, 0);
checkLimit(0, 3000);
checkLimit(2000, 3000);
}

/**
* Check correct result size fetched.
*
* @param offset Rows offset.
* @param fetch Fetch rows count (zero means unlimited).
*/
private void checkLimit(int offset, int fetch) {
ExecutionContext<Object[]> ctx = executionContext(true);
IgniteTypeFactory tf = ctx.getTypeFactory();
RelDataType rowType = TypeUtils.createRowType(tf, int.class);

RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
LimitNode<Object[]> limitNode = new LimitNode<>(ctx, rowType, () -> offset, fetch == 0 ? null : () -> fetch);
SourceNode srcNode = new SourceNode(ctx, rowType);

rootNode.register(limitNode);
limitNode.register(srcNode);

if (fetch > 0) {
for (int i = offset; i < offset + fetch; i++) {
assertTrue(rootNode.hasNext());
assertEquals(i, rootNode.next()[0]);
}

assertFalse(rootNode.hasNext());
assertEquals(srcNode.requested.get(), offset + fetch);
} else {
assertTrue(rootNode.hasNext());
assertEquals(offset, rootNode.next()[0]);
assertTrue(srcNode.requested.get() > offset);
}
}

private static class SourceNode extends AbstractNode<Object[]> {

AtomicInteger requested = new AtomicInteger();

public SourceNode(ExecutionContext<Object[]> ctx, RelDataType rowType) {
super(ctx, rowType);
}

/** {@inheritDoc} */
@Override protected void rewindInternal() {
// No-op.
}

/** {@inheritDoc} */
@Override protected Downstream<Object[]> requestDownstream(int idx) {
return null;
}

/** {@inheritDoc} */
@Override public void request(int rowsCnt) {
int r = requested.getAndAdd(rowsCnt);

context().execute(() -> {
for (int i = 0; i < rowsCnt; i++) {
downstream().push(new Object[]{r + i});
}
}, this::onError);
}
}
}

0 comments on commit 2c3fcb2

Please sign in to comment.