From b9efcf39aaf8fdfa669ba33e59de13535742a6dd Mon Sep 17 00:00:00 2001 From: Philipp Bogensberger Date: Wed, 1 Jul 2015 14:06:35 +0200 Subject: [PATCH] fixup! Fix: Kill existing jobs before retry to prevent JobAlreadyExists errors --- .../java/io/crate/action/sql/TransportBaseSQLAction.java | 3 +-- .../integrationtests/PartitionedTableIntegrationTest.java | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/src/main/java/io/crate/action/sql/TransportBaseSQLAction.java b/sql/src/main/java/io/crate/action/sql/TransportBaseSQLAction.java index 4c3353369245..cfde1439fe89 100644 --- a/sql/src/main/java/io/crate/action/sql/TransportBaseSQLAction.java +++ b/sql/src/main/java/io/crate/action/sql/TransportBaseSQLAction.java @@ -258,8 +258,7 @@ public void onFailure(final @Nonnull Throwable t) { if (t instanceof CancellationException) { message = Constants.KILLED_MESSAGE; logger.debug("KILLED: [{}]", request.stmt()); - } else if ((Exceptions.unwrap(t) instanceof IndexShardMissingException ) - && attempt <= MAX_SHARD_MISSING_RETRIES) { + } else if (Exceptions.unwrap(t) instanceof IndexShardMissingException && attempt <= MAX_SHARD_MISSING_RETRIES) { logger.debug("FAILED ({}/{} attempts) - Retry: [{}]", attempt, MAX_SHARD_MISSING_RETRIES, request.stmt()); killJobs(ImmutableList.of(plan.jobId()), new FutureCallback() { @Override diff --git a/sql/src/test/java/io/crate/integrationtests/PartitionedTableIntegrationTest.java b/sql/src/test/java/io/crate/integrationtests/PartitionedTableIntegrationTest.java index a7b5e47e8b20..fb66fa2305e1 100644 --- a/sql/src/test/java/io/crate/integrationtests/PartitionedTableIntegrationTest.java +++ b/sql/src/test/java/io/crate/integrationtests/PartitionedTableIntegrationTest.java @@ -1905,6 +1905,7 @@ public void testSelectWhileShardsAreRelocating() throws Throwable { new Object[] { "Trillian", "a" }, }); execute("refresh table t"); + execute("set global stats.enabled=true"); final AtomicReference lastThrowable = new AtomicReference<>(); final CountDownLatch selects = new CountDownLatch(100); @@ -1917,8 +1918,6 @@ public void run() { execute("select * from t"); } catch (Throwable t) { // The failed job should have three started operations - execute("reset global stats.enabled"); - execute("set global stats.enabled=true"); SQLResponse res = execute("select id from sys.jobs_log where error is not null order by started desc limit 1"); if (res.rowCount() > 0) { String id = (String) res.rows()[0][0]; @@ -1951,7 +1950,7 @@ public void run() { nodeSwap.put(nodeIds.get(0), nodeIds.get(1)); nodeSwap.put(nodeIds.get(1), nodeIds.get(0)); - final CountDownLatch relocations = new CountDownLatch(10); + final CountDownLatch relocations = new CountDownLatch(20); Thread relocatingThread = new Thread(new Runnable() { @Override public void run() {