Skip to content
Permalink
Browse files
IGNITE-16364 Sql. Adopt IGNITE-14991, IGNITE-15235, IGNITE-15526 - Fixes
 #581.

Supports regexp operators.
Remove tableSpoolBroadcastNotRewindable test.
Incorrect grouping reset during rewind.

Signed-off-by: zstan <stanilovsky@gmail.com>
  • Loading branch information
zstan committed Jan 26, 2022
1 parent e6ea8da commit de98821d6053af7e6e74e15e32c12c5b81adc068
Show file tree
Hide file tree
Showing 16 changed files with 313 additions and 128 deletions.
@@ -42,10 +42,12 @@
*/
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
public class ItFunctionsTest extends AbstractBasicIntegrationTest {
private static final Object[] NULL_RESULT = new Object[] { null };

@Test
public void testLength() {
assertQuery("SELECT LENGTH('TEST')").returns(4).check();
assertQuery("SELECT LENGTH(NULL)").returns(new Object[]{null}).check();
assertQuery("SELECT LENGTH(NULL)").returns(NULL_RESULT).check();
}

@Test
@@ -203,31 +205,31 @@ public void testRangeWithCache() {
public void testPercentRemainder() {
assertQuery("SELECT 3 % 2").returns(1).check();
assertQuery("SELECT 4 % 2").returns(0).check();
assertQuery("SELECT NULL % 2").returns(new Object[]{null}).check();
assertQuery("SELECT 3 % NULL::int").returns(new Object[]{null}).check();
assertQuery("SELECT 3 % NULL").returns(new Object[] { null }).check();
assertQuery("SELECT NULL % 2").returns(NULL_RESULT).check();
assertQuery("SELECT 3 % NULL::int").returns(NULL_RESULT).check();
assertQuery("SELECT 3 % NULL").returns(NULL_RESULT).check();
}

@Test
public void testNullFunctionArguments() {
// Don't infer result data type from arguments (result is always INTEGER_NULLABLE).
assertQuery("SELECT ASCII(NULL)").returns(new Object[] { null }).check();
assertQuery("SELECT ASCII(NULL)").returns(NULL_RESULT).check();
// Inferring result data type from first STRING argument.
assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(new Object[] { null }).check();
assertQuery("SELECT REPLACE(NULL, '1', '2')").returns(NULL_RESULT).check();
// Inferring result data type from both arguments.
assertQuery("SELECT MOD(1, null)").returns(new Object[] { null }).check();
assertQuery("SELECT MOD(1, null)").returns(NULL_RESULT).check();
// Inferring result data type from first NUMERIC argument.
assertQuery("SELECT TRUNCATE(NULL, 0)").returns(new Object[] { null }).check();
assertQuery("SELECT TRUNCATE(NULL, 0)").returns(NULL_RESULT).check();
// Inferring arguments data types and then inferring result data type from all arguments.
assertQuery("SELECT FALSE AND NULL").returns(false).check();
}

@Test
public void testReplace() {
assertQuery("SELECT REPLACE('12341234', '1', '55')").returns("5523455234").check();
assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(new Object[] { null }).check();
assertQuery("SELECT REPLACE('1', NULL, '5')").returns(new Object[] { null }).check();
assertQuery("SELECT REPLACE('11', '1', NULL)").returns(new Object[] { null }).check();
assertQuery("SELECT REPLACE(NULL, '1', '5')").returns(NULL_RESULT).check();
assertQuery("SELECT REPLACE('1', NULL, '5')").returns(NULL_RESULT).check();
assertQuery("SELECT REPLACE('11', '1', NULL)").returns(NULL_RESULT).check();
assertQuery("SELECT REPLACE('11', '1', '')").returns("").check();
}

@@ -236,4 +238,33 @@ public void testMonthnameDayname() {
assertQuery("SELECT MONTHNAME(DATE '2021-01-01')").returns("January").check();
assertQuery("SELECT DAYNAME(DATE '2021-01-01')").returns("Friday").check();
}

@Test
public void testRegex() {
assertQuery("SELECT 'abcd' ~ 'ab[cd]'").returns(true).check();
assertQuery("SELECT 'abcd' ~ 'ab[cd]$'").returns(false).check();
assertQuery("SELECT 'abcd' ~ 'ab[CD]'").returns(false).check();
assertQuery("SELECT 'abcd' ~* 'ab[cd]'").returns(true).check();
assertQuery("SELECT 'abcd' ~* 'ab[cd]$'").returns(false).check();
assertQuery("SELECT 'abcd' ~* 'ab[CD]'").returns(true).check();
assertQuery("SELECT 'abcd' !~ 'ab[cd]'").returns(false).check();
assertQuery("SELECT 'abcd' !~ 'ab[cd]$'").returns(true).check();
assertQuery("SELECT 'abcd' !~ 'ab[CD]'").returns(true).check();
assertQuery("SELECT 'abcd' !~* 'ab[cd]'").returns(false).check();
assertQuery("SELECT 'abcd' !~* 'ab[cd]$'").returns(true).check();
assertQuery("SELECT 'abcd' !~* 'ab[CD]'").returns(false).check();
assertQuery("SELECT null ~ 'ab[cd]'").returns(NULL_RESULT).check();
assertQuery("SELECT 'abcd' ~ null").returns(NULL_RESULT).check();
assertQuery("SELECT null ~ null").returns(NULL_RESULT).check();
assertQuery("SELECT null ~* 'ab[cd]'").returns(NULL_RESULT).check();
assertQuery("SELECT 'abcd' ~* null").returns(NULL_RESULT).check();
assertQuery("SELECT null ~* null").returns(NULL_RESULT).check();
assertQuery("SELECT null !~ 'ab[cd]'").returns(NULL_RESULT).check();
assertQuery("SELECT 'abcd' !~ null").returns(NULL_RESULT).check();
assertQuery("SELECT null !~ null").returns(NULL_RESULT).check();
assertQuery("SELECT null !~* 'ab[cd]'").returns(NULL_RESULT).check();
assertQuery("SELECT 'abcd' !~* null").returns(NULL_RESULT).check();
assertQuery("SELECT null !~* null").returns(NULL_RESULT).check();
assertThrows(IgniteException.class, () -> sql("SELECT 'abcd' ~ '[a-z'"));
}
}
@@ -627,7 +627,7 @@ data: {
"parserImpls.ftl"
]

includePosixOperators: false
includePosixOperators: true
includeCompoundIdentifier: true
includeBraces: true
includeAdditionalDeclarations: false
@@ -330,3 +330,9 @@ SqlNode SqlAlterTable() :
}
)
}

<DEFAULT, DQID, BTID> TOKEN :
{
< NEGATE: "!" >
| < TILDE: "~" >
}
@@ -49,7 +49,7 @@ public QueryTaskExecutorImpl(String nodeName) {
public void start() {
this.stripedThreadPoolExecutor = new StripedThreadPoolExecutor(
4,
NamedThreadFactory.threadPrefix(nodeName, "calciteQry"),
NamedThreadFactory.threadPrefix(nodeName, "sqlExec"),
null,
false,
0
@@ -413,17 +413,18 @@ public class RexImpTable {
map.put(NOT_SIMILAR_TO, NotImplementor.of(similarImplementor));

// POSIX REGEX
final MethodImplementor posixRegexImplementor =
new MethodImplementor(BuiltInMethod.POSIX_REGEX.method,
NullPolicy.STRICT, false);
final MethodImplementor posixRegexImplementorCaseSensitive =
new PosixRegexMethodImplementor(true);
final MethodImplementor posixRegexImplementorCaseInsensitive =
new PosixRegexMethodImplementor(false);
map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_INSENSITIVE,
posixRegexImplementor);
posixRegexImplementorCaseInsensitive);
map.put(SqlStdOperatorTable.POSIX_REGEX_CASE_SENSITIVE,
posixRegexImplementor);
posixRegexImplementorCaseSensitive);
map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_INSENSITIVE,
NotImplementor.of(posixRegexImplementor));
NotImplementor.of(posixRegexImplementorCaseInsensitive));
map.put(SqlStdOperatorTable.NEGATED_POSIX_REGEX_CASE_SENSITIVE,
NotImplementor.of(posixRegexImplementor));
NotImplementor.of(posixRegexImplementorCaseSensitive));
map.put(REGEXP_REPLACE, new RegexpReplaceImplementor());

// Multisets & arrays
@@ -993,6 +994,28 @@ Expression implementSafe(RexToLixTranslator translator,
}
}

/**
* Implementor for {@link org.apache.calcite.sql.fun.SqlPosixRegexOperator}s.
*/
private static class PosixRegexMethodImplementor extends MethodImplementor {
protected final boolean caseSensitive;

PosixRegexMethodImplementor(boolean caseSensitive) {
super(BuiltInMethod.POSIX_REGEX.method, NullPolicy.STRICT, false);
this.caseSensitive = caseSensitive;
}

/** {@inheritDoc} */
@Override Expression implementSafe(RexToLixTranslator translator,
RexCall call, List<Expression> argValueList) {
assert argValueList.size() == 2;
// Add extra parameter (caseSensitive boolean flag), required by SqlFunctions#posixRegex.
final List<Expression> newOperands = new ArrayList<>(argValueList);
newOperands.add(Expressions.constant(caseSensitive));
return super.implementSafe(translator, call, newOperands);
}
}

/**
* Implementor for JSON_VALUE function, convert to solid format "JSON_VALUE(json_doc, path, empty_behavior, empty_default,
* error_behavior, error default)" in order to simplify the runtime implementation.
@@ -1708,13 +1731,13 @@ Expression implementSafe(final RexToLixTranslator translator,
private static class NotImplementor extends AbstractRexCallImplementor {
private AbstractRexCallImplementor implementor;

private NotImplementor(AbstractRexCallImplementor implementor) {
super(null, false);
private NotImplementor(NullPolicy nullPolicy, AbstractRexCallImplementor implementor) {
super(nullPolicy, false);
this.implementor = implementor;
}

static AbstractRexCallImplementor of(AbstractRexCallImplementor implementor) {
return new NotImplementor(implementor);
return new NotImplementor(implementor.nullPolicy, implementor);
}

/** {@inheritDoc} */
@@ -150,7 +150,7 @@ public void end() throws Exception {
protected void rewindInternal() {
requested = 0;
waiting = 0;
groupings.forEach(grouping -> grouping.groups.clear());
groupings.forEach(Grouping::reset);
}

/** {@inheritDoc} */
@@ -237,13 +237,23 @@ private Grouping(byte grpId, ImmutableBitSet grpFields) {

handler = context().rowHandler();

init();
}

private void init() {
// Initializes aggregates for case when no any rows will be added into the aggregate to have 0 as result.
// Doesn't do it for MAP type due to we don't want send from MAP node zero results because it looks redundant.
if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type == AggregateType.SINGLE)) {
groups.put(GroupKey.EMPTY_GRP_KEY, create(GroupKey.EMPTY_GRP_KEY));
}
}

private void reset() {
groups.clear();

init();
}

private void add(RowT row) {
if (type == AggregateType.REDUCE) {
addOnReducer(row);
@@ -18,8 +18,17 @@
package org.apache.ignite.internal.sql.engine.exec.rel;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
@@ -32,6 +41,9 @@
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.util.Pair;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -45,6 +57,8 @@ public class AbstractExecutionTest extends IgniteAbstractTest {

private QueryTaskExecutorImpl taskExecutor;

private List<UUID> nodes;

@BeforeEach
public void beforeTest() {
taskExecutor = new QueryTaskExecutorImpl("no_node");
@@ -65,6 +79,19 @@ public void afterTest() {
}

protected ExecutionContext<Object[]> executionContext() {
return executionContext(false);
}

protected ExecutionContext<Object[]> executionContext(boolean withDelays) {
if (withDelays) {
StripedThreadPoolExecutor testExecutor = new IgniteTestStripedThreadPoolExecutor(8,
NamedThreadFactory.threadPrefix("fake-test-node", "sqlTestExec"),
null,
false,
0);
IgniteTestUtils.setFieldValue(taskExecutor, "stripedThreadPoolExecutor", testExecutor);
}

FragmentDescription fragmentDesc = new FragmentDescription(0, null, null, null);
return new ExecutionContext<>(
BaseQueryContext.builder()
@@ -90,6 +117,80 @@ protected Object[] row(Object... fields) {
return fields;
}

/** Task reordering executor. */
private static class IgniteTestStripedThreadPoolExecutor extends StripedThreadPoolExecutor {
final Deque<Pair<Runnable, Integer>> tasks = new ArrayDeque<>();

/** Internal stop flag. */
AtomicBoolean stop = new AtomicBoolean();

/** Inner execution service. */
ExecutorService exec = Executors.newWorkStealingPool();

CompletableFuture fut;

/** {@inheritDoc} */
public IgniteTestStripedThreadPoolExecutor(
int concurrentLvl,
String threadNamePrefix,
Thread.UncaughtExceptionHandler exHnd,
boolean allowCoreThreadTimeOut,
long keepAliveTime
) {
super(concurrentLvl, threadNamePrefix, exHnd, allowCoreThreadTimeOut, keepAliveTime);

fut = IgniteTestUtils.runAsync(() -> {
while (!stop.get()) {
synchronized (tasks) {
while (tasks.isEmpty()) {
try {
tasks.wait();
} catch (InterruptedException e) {
// no op.
}
}

Pair<Runnable, Integer> r = tasks.pollFirst();

exec.execute(() -> {
LockSupport.parkNanos(ThreadLocalRandom.current().nextLong(0, 10_000));
super.execute(r.getFirst(), r.getSecond());
});

tasks.notifyAll();
}
}
});
}

/** {@inheritDoc} */
@Override public void execute(Runnable task, int idx) {
synchronized (tasks) {
tasks.add(new Pair<>(task, idx));

tasks.notifyAll();
}
}

/** {@inheritDoc} */
@Override public void shutdown() {
stop.set(true);

fut.cancel(true);

super.shutdown();
}

/** {@inheritDoc} */
@Override public List<Runnable> shutdownNow() {
stop.set(true);

fut.cancel(true);

return super.shutdownNow();
}
}

/**
* TestTable.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -155,7 +256,7 @@ private Object[] createRow(int rowNum) {
@NotNull
@Override
public Iterator<Object[]> iterator() {
return new Iterator<Object[]>() {
return new Iterator<>() {
private int curRow;

@Override

0 comments on commit de98821

Please sign in to comment.