Skip to content

Commit

Permalink
fixup! Fix: Kill existing jobs before retry to prevent JobAlreadyExis…
Browse files Browse the repository at this point in the history
…ts errors
  • Loading branch information
Philipp Bogensberger committed Jul 1, 2015
1 parent e16c36c commit 83f85e5
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.crate.executor.Job;
import io.crate.executor.TaskResult;
import io.crate.executor.transport.kill.KillJobsRequest;
import io.crate.executor.transport.kill.KillJobsResponse;
import io.crate.executor.transport.kill.KillResponse;
import io.crate.executor.transport.kill.TransportKillJobsNodeAction;
import io.crate.metadata.PartitionName;
import io.crate.metadata.TableIdent;
Expand Down Expand Up @@ -258,7 +258,8 @@ public void onFailure(final @Nonnull Throwable t) {
if (t instanceof CancellationException) {
message = Constants.KILLED_MESSAGE;
logger.debug("KILLED: [{}]", request.stmt());
} else if (Exceptions.unwrap(t) instanceof IndexShardMissingException && attempt <= MAX_SHARD_MISSING_RETRIES) {
} else if ((Exceptions.unwrap(t) instanceof IndexShardMissingException )
&& attempt <= MAX_SHARD_MISSING_RETRIES) {
logger.debug("FAILED ({}/{} attempts) - Retry: [{}]", attempt, MAX_SHARD_MISSING_RETRIES, request.stmt());
killJobs(ImmutableList.of(plan.jobId()), new FutureCallback<Long>() {
@Override
Expand Down Expand Up @@ -296,11 +297,11 @@ private void killJobs(List<UUID> toKill, FutureCallback<Long> callback) {
final SettableFuture<Long> resultFuture = SettableFuture.create();
Futures.addCallback(resultFuture, callback);
for (DiscoveryNode node : nodes) {
transportKillJobsNodeAction.execute(node.id(), request, new ActionListener<KillJobsResponse>() {
transportKillJobsNodeAction.execute(node.id(), request, new ActionListener<KillResponse>() {

@Override
public void onResponse(KillJobsResponse killJobsResponse) {
numKilled.addAndGet(killJobsResponse.numKilled());
public void onResponse(KillResponse killResponse) {
numKilled.addAndGet(killResponse.numKilled());
countdown();
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@

import java.io.IOException;

public class KillAllResponse extends TransportResponse {
public class KillResponse extends TransportResponse {

private long numKilled;

public KillAllResponse(long numKilled) {
public KillResponse(long numKilled) {
this.numKilled = numKilled;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.transport.TransportService;

@Singleton
public class TransportKillAllNodeAction implements NodeAction<KillAllRequest, KillAllResponse> {
public class TransportKillAllNodeAction implements NodeAction<KillAllRequest, KillResponse> {

private static final String TRANSPORT_ACTION = "crate/sql/kill_all";

Expand All @@ -46,20 +46,20 @@ public TransportKillAllNodeAction(JobContextService jobContextService,
TransportService transportService) {
this.jobContextService = jobContextService;
this.transports = transports;
transportService.registerHandler(TRANSPORT_ACTION, new NodeActionRequestHandler<KillAllRequest, KillAllResponse>(this) {
transportService.registerHandler(TRANSPORT_ACTION, new NodeActionRequestHandler<KillAllRequest, KillResponse>(this) {
@Override
public KillAllRequest newInstance() {
return new KillAllRequest();
}
});
}

public void execute(String targetNode, KillAllRequest request, ActionListener<KillAllResponse> listener) {
public void execute(String targetNode, KillAllRequest request, ActionListener<KillResponse> listener) {
transports.executeLocalOrWithTransport(this, targetNode, request, listener,
new DefaultTransportResponseHandler<KillAllResponse>(listener, executorName()) {
new DefaultTransportResponseHandler<KillResponse>(listener, executorName()) {
@Override
public KillAllResponse newInstance() {
return new KillAllResponse(0);
public KillResponse newInstance() {
return new KillResponse(0);
}
});
}
Expand All @@ -75,9 +75,9 @@ public String executorName() {
}

@Override
public void nodeOperation(KillAllRequest request, ActionListener<KillAllResponse> listener) {
public void nodeOperation(KillAllRequest request, ActionListener<KillResponse> listener) {
try {
listener.onResponse(new KillAllResponse(jobContextService.killAll()));
listener.onResponse(new KillResponse(jobContextService.killAll()));
} catch (Throwable t) {
listener.onFailure(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.transport.TransportService;

@Singleton
public class TransportKillJobsNodeAction implements NodeAction<KillJobsRequest, KillJobsResponse> {
public class TransportKillJobsNodeAction implements NodeAction<KillJobsRequest, KillResponse> {

private static final String TRANSPORT_ACTION = "crate/sql/kill_jobs";

Expand All @@ -46,20 +46,20 @@ public TransportKillJobsNodeAction(JobContextService jobContextService,
TransportService transportService) {
this.jobContextService = jobContextService;
this.transports = transports;
transportService.registerHandler(TRANSPORT_ACTION, new NodeActionRequestHandler<KillJobsRequest, KillJobsResponse>(this) {
transportService.registerHandler(TRANSPORT_ACTION, new NodeActionRequestHandler<KillJobsRequest, KillResponse>(this) {
@Override
public KillJobsRequest newInstance() {
return new KillJobsRequest();
}
});
}

public void execute(String targetNode, KillJobsRequest request, ActionListener<KillJobsResponse> listener) {
public void execute(String targetNode, KillJobsRequest request, ActionListener<KillResponse> listener) {
transports.executeLocalOrWithTransport(this, targetNode, request, listener,
new DefaultTransportResponseHandler<KillJobsResponse>(listener, executorName()) {
new DefaultTransportResponseHandler<KillResponse>(listener, executorName()) {
@Override
public KillJobsResponse newInstance() {
return new KillJobsResponse(0);
public KillResponse newInstance() {
return new KillResponse(0);
}
});
}
Expand All @@ -75,7 +75,7 @@ public String executorName() {
}

@Override
public void nodeOperation(KillJobsRequest request, ActionListener<KillJobsResponse> listener) {
listener.onResponse(new KillJobsResponse(jobContextService.killJobs(request.toKill())));
public void nodeOperation(KillJobsRequest request, ActionListener<KillResponse> listener) {
listener.onResponse(new KillResponse(jobContextService.killJobs(request.toKill())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.crate.executor.RowCountResult;
import io.crate.executor.TaskResult;
import io.crate.executor.transport.kill.KillAllRequest;
import io.crate.executor.transport.kill.KillAllResponse;
import io.crate.executor.transport.kill.KillResponse;
import io.crate.executor.transport.kill.TransportKillAllNodeAction;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -67,10 +67,10 @@ public void start() {
final AtomicReference<Throwable> lastThrowable = new AtomicReference<>();

for (DiscoveryNode node : nodes) {
transportKillAllNodeAction.execute(node.id(), request, new ActionListener<KillAllResponse>() {
transportKillAllNodeAction.execute(node.id(), request, new ActionListener<KillResponse>() {
@Override
public void onResponse(KillAllResponse killAllResponse) {
numKilled.addAndGet(killAllResponse.numKilled());
public void onResponse(KillResponse killResponse) {
numKilled.addAndGet(killResponse.numKilled());
countdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void testKillIsCalledOnJobContextService() throws Exception {
);

final CountDownLatch latch = new CountDownLatch(1);
transportKillAllNodeAction.execute("noop_id", new KillAllRequest(), new ActionListener<KillAllResponse>() {
transportKillAllNodeAction.execute("noop_id", new KillAllRequest(), new ActionListener<KillResponse>() {
@Override
public void onResponse(KillAllResponse killAllResponse) {
public void onResponse(KillResponse killResponse) {
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ public void testKillIsCalledOnJobContextService() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);
List<UUID> toKill = ImmutableList.of(UUID.randomUUID(), UUID.randomUUID());
transportKillJobsNodeAction.execute("noop_id", new KillJobsRequest(toKill), new ActionListener<KillJobsResponse>() {
transportKillJobsNodeAction.execute("noop_id", new KillJobsRequest(toKill), new ActionListener<KillResponse>() {
@Override
public void onResponse(KillJobsResponse killAllResponse) {
public void onResponse(KillResponse killAllResponse) {
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import io.crate.executor.transport.kill.KillAllRequest;
import io.crate.executor.transport.kill.KillAllResponse;
import io.crate.executor.transport.kill.KillResponse;
import io.crate.executor.transport.kill.TransportKillAllNodeAction;
import io.crate.test.integration.CrateUnitTest;
import org.elasticsearch.Version;
Expand All @@ -46,7 +46,7 @@
public class KillTaskTest extends CrateUnitTest {

@Captor
public ArgumentCaptor<ActionListener<KillAllResponse>> responseListener;
public ArgumentCaptor<ActionListener<KillResponse>> responseListener;

private KillTask killTask;

Expand All @@ -65,8 +65,8 @@ public void testRowCountIsAccumulated() throws Exception {
TransportKillAllNodeAction killNodeAction = startKillTaskAndGetTransportKillAllNodeAction();
verify(killNodeAction, times(3)).execute(anyString(), any(KillAllRequest.class), responseListener.capture());

for (ActionListener<KillAllResponse> killAllResponseActionListener : responseListener.getAllValues()) {
killAllResponseActionListener.onResponse(new KillAllResponse(3));
for (ActionListener<KillResponse> killAllResponseActionListener : responseListener.getAllValues()) {
killAllResponseActionListener.onResponse(new KillResponse(3));
}

Bucket rows = killTask.result().get(0).get().rows();
Expand Down

0 comments on commit 83f85e5

Please sign in to comment.