Skip to content

Commit

Permalink
Merge 2a77d1f into 0135b67
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Bogensberger committed May 11, 2015
2 parents 0135b67 + 2a77d1f commit 00b0b01
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 2 deletions.
28 changes: 28 additions & 0 deletions sql/src/main/java/io/crate/jobs/JobKilledException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.jobs;


import java.io.IOException;

public class JobKilledException extends IOException {
}
9 changes: 8 additions & 1 deletion sql/src/main/java/io/crate/jobs/PageDownstreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,14 @@ public void close() {

@Override
public void kill() {
throw new UnsupportedOperationException("kill is not implemented");
if (!closed.getAndSet(true)) {
for (ContextCallback contextCallback : callbacks) {
contextCallback.onClose();
}
pageDownstream.fail(new JobKilledException());
} else {
LOGGER.warn("called kill on an already closed PageDownstreamContext");
}
}

private class ResultListenerBridgingConsumeListener implements PageConsumeListener {
Expand Down
19 changes: 18 additions & 1 deletion sql/src/test/java/io/crate/jobs/PageDownstreamContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;

import static org.mockito.Mockito.mock;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.*;

public class PageDownstreamContextTest extends CrateUnitTest {

Expand All @@ -47,4 +49,19 @@ public void testCantSetSameBucketTwiceWithoutReceivingFullPage() throws Exceptio
ctx.setBucket(1, new SingleRowBucket(new Row1("foo")), false, pageResultListener);
ctx.setBucket(1, new SingleRowBucket(new Row1("foo")), false, pageResultListener);
}

@Test
public void testKill() throws Exception {
PageDownstream downstream = mock(PageDownstream.class);
ContextCallback callback = mock(ContextCallback.class);

PageDownstreamContext ctx = new PageDownstreamContext(downstream, new Streamer[0], 3);
ctx.addCallback(callback);
ctx.kill();

verify(callback, times(1)).onClose();
ArgumentCaptor<JobKilledException> e = ArgumentCaptor.forClass(JobKilledException.class);
verify(downstream, times(1)).fail(e.capture());
assertThat(e.getValue(), instanceOf(JobKilledException.class));
}
}

0 comments on commit 00b0b01

Please sign in to comment.