Skip to content

Commit

Permalink
add some more unit tests for qtf push
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 15, 2015
1 parent a18a2a7 commit 038313c
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void onFailure(@Nonnull Throwable t) {
});
}

private boolean hasDirectResponse(ExecutionNode[] executionNodes) {
static boolean hasDirectResponse(ExecutionNode[] executionNodes) {
for (ExecutionNode executionNode : executionNodes) {
if (ExecutionNodes.hasDirectResponseDownstream(executionNode.downstreamNodes())) {
return true;
Expand All @@ -164,7 +164,7 @@ private boolean hasDirectResponse(ExecutionNode[] executionNodes) {
return false;
}

private static Map<String, Collection<ExecutionNode>> groupExecutionNodesByServer(ExecutionNode[] executionNodes) {
static Map<String, Collection<ExecutionNode>> groupExecutionNodesByServer(ExecutionNode[] executionNodes) {
ArrayListMultimap<String, ExecutionNode> executionNodesGroupedByServer = ArrayListMultimap.create();
for (ExecutionNode executionNode : executionNodes) {
for (String server : executionNode.executionNodes()) {
Expand Down
3 changes: 3 additions & 0 deletions sql/src/main/java/io/crate/jobs/PageDownstreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public void setBucket(int bucketIdx, Bucket rows, boolean isLast, PageConsumeLis
}
synchronized (lock) {
LOGGER.trace("setBucket: {}", bucketIdx);
if (allFuturesSet.get(bucketIdx)) {
throw new IllegalStateException("May not set the same bucket of a page more than once");
}

if (pageEmpty()) {
LOGGER.trace("calling nextPage");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import com.google.common.collect.Sets;
import io.crate.core.collections.TreeMapBuilder;
import io.crate.metadata.Routing;
import io.crate.planner.node.ExecutionNode;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.MergeNode;
import org.hamcrest.Matchers;
import org.junit.Test;

import java.util.*;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

public class ExecutionNodesTaskTest {


@Test
public void testGroupByServer() throws Exception {

Routing twoNodeRouting = new Routing(TreeMapBuilder.<String, Map<String, List<Integer>>>newMapBuilder()
.put("node1", TreeMapBuilder.<String, List<Integer>>newMapBuilder().put("t1", Arrays.asList(1, 2)).map())
.put("node2", TreeMapBuilder.<String, List<Integer>>newMapBuilder().put("t1", Arrays.asList(3, 4)).map())
.map());

CollectNode c1 = new CollectNode(1, "c1", twoNodeRouting);

MergeNode m1 = new MergeNode(2, "merge1", 2);
m1.executionNodes(Sets.newHashSet("node3", "node4"));

MergeNode m2 = new MergeNode(3, "merge2", 2);
m2.executionNodes(Sets.newHashSet("node1", "node3"));

Map<String, Collection<ExecutionNode>> groupByServer = ExecutionNodesTask.groupExecutionNodesByServer(new ExecutionNode[]{c1, m1, m2});

assertThat(groupByServer.containsKey("node1"), is(true));
assertThat(groupByServer.get("node1"), Matchers.<ExecutionNode>containsInAnyOrder(c1, m2));

assertThat(groupByServer.containsKey("node2"), is(true));
assertThat(groupByServer.get("node2"), Matchers.<ExecutionNode>containsInAnyOrder(c1));

assertThat(groupByServer.containsKey("node3"), is(true));
assertThat(groupByServer.get("node3"), Matchers.<ExecutionNode>containsInAnyOrder(m1, m2));

assertThat(groupByServer.containsKey("node4"), is(true));
assertThat(groupByServer.get("node4"), Matchers.<ExecutionNode>containsInAnyOrder(m1));
}

@Test
public void testDetectsHasDirectResponse() throws Exception {
CollectNode c1 = new CollectNode(1, "c1");
c1.downstreamNodes(Collections.singletonList("foo"));

assertThat(ExecutionNodesTask.hasDirectResponse(new ExecutionNode[]{c1}), is(false));

CollectNode c2 = new CollectNode(1, "c1");
c2.downstreamNodes(Collections.singletonList(ExecutionNode.DIRECT_RETURN_DOWNSTREAM_NODE));
assertThat(ExecutionNodesTask.hasDirectResponse(new ExecutionNode[]{c2}), is(true));
}
}
49 changes: 49 additions & 0 deletions sql/src/test/java/io/crate/jobs/JobExecutionContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.jobs;

import io.crate.Streamer;
import io.crate.operation.PageDownstream;
import io.crate.test.integration.CrateUnitTest;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.UUID;

import static org.mockito.Mockito.mock;

public class JobExecutionContextTest extends CrateUnitTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testInitializingFinalMergeForTheSameExecutionNodeThrowsAnError() throws Exception {
expectedException.expect(IllegalStateException.class);

JobExecutionContext context = new JobExecutionContext(UUID.randomUUID(), 100L);
PageDownstreamContext pageDownstreamContext = new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], 1);
context.setPageDownstreamContext(1, pageDownstreamContext);
context.setPageDownstreamContext(1, pageDownstreamContext);
}
}
50 changes: 50 additions & 0 deletions sql/src/test/java/io/crate/jobs/PageDownstreamContextTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.jobs;

import io.crate.Streamer;
import io.crate.core.collections.Row1;
import io.crate.core.collections.SingleRowBucket;
import io.crate.operation.PageConsumeListener;
import io.crate.operation.PageDownstream;
import io.crate.test.integration.CrateUnitTest;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import static org.mockito.Mockito.mock;

public class PageDownstreamContextTest extends CrateUnitTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testCantSetSameBucketTwiceWithoutReceivingFullPage() throws Exception {
expectedException.expect(IllegalStateException.class);
PageDownstreamContext ctx = new PageDownstreamContext(mock(PageDownstream.class), new Streamer[0], 3);

PageConsumeListener consumeListener = mock(PageConsumeListener.class);
ctx.setBucket(1, new SingleRowBucket(new Row1("foo")), false, consumeListener);
ctx.setBucket(1, new SingleRowBucket(new Row1("foo")), false, consumeListener);
}
}
13 changes: 12 additions & 1 deletion sql/src/test/java/io/crate/planner/PlannerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ public class PlannerTest extends CrateUnitTest {

private ClusterService clusterService;

private final static String LOCAL_NODE_ID = "foo";


class TestModule extends MetaDataModule {

@Override
protected void configure() {
clusterService = mock(ClusterService.class);
DiscoveryNode localNode = mock(DiscoveryNode.class);
when(localNode.id()).thenReturn("foo");
when(localNode.id()).thenReturn(LOCAL_NODE_ID);
when(clusterService.localNode()).thenReturn(localNode);
ClusterState clusterState = mock(ClusterState.class);
MetaData metaData = mock(MetaData.class);
Expand Down Expand Up @@ -1867,4 +1869,13 @@ public void testExecutionNodeIdSequence() throws Exception {
assertThat(collectNode1.executionNodeId(), is(0));
assertThat(collectNode2.executionNodeId(), is(1));
}

@SuppressWarnings("ConstantConditions")
@Test
public void testLimitThatIsBiggerThanPageSizeCausesQTFPUshPlan() throws Exception {
QueryThenFetch plan = (QueryThenFetch) plan("select * from users limit 2147483647 ");
assertThat(plan.collectNode().downstreamNodes().size(), is(1));
assertThat(plan.collectNode().downstreamNodes().get(0), is(LOCAL_NODE_ID));
assertThat(plan.collectNode().hasDistributingDownstreams(), is(true));
}
}

0 comments on commit 038313c

Please sign in to comment.