Skip to content

Commit

Permalink
do not shutdown the BulkShardProcessor with pending requests
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Mar 18, 2015
1 parent b162a2a commit 819b74e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 10 deletions.
8 changes: 7 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
Changes for Crate
=================

- The ``format`` scalar function now always uses Locale.ENGLISH
Unreleased
==========

- Fix: Do not shutdown the BulkShardProcessor while there are pending requests
anymore

- Fix: The ``format`` scalar function now always uses Locale.ENGLISH

2015/03/17 0.48.0
=================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,14 @@ public void close() {
if (pending.get() == 0) {
setResult();
}
stopExecutor();
}

private void stopExecutor() {
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
// ignore
} finally {
scheduledExecutorService.shutdownNow();
}
Expand All @@ -230,11 +229,16 @@ private void setFailure(Throwable e) {
}

private void setResult() {
Throwable throwable = failure.get();
if (throwable == null) {
result.set(responses);
} else {
result.setException(throwable);
try {
Throwable throwable = failure.get();

if (throwable == null) {
result.set(responses);
} else {
result.setException(throwable);
}
} finally {
stopExecutor();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ public void testCopyToFile() throws Exception {
@Test
public void testCopyColumnsToDirectory() throws Exception {
this.setup.groupBySetup();

waitNoPendingTasksOnAll();

String uriTemplate = Paths.get(folder.getRoot().toURI()).toUri().toString();
SQLResponse response = execute("copy characters (name, details['job']) to DIRECTORY ?", new Object[]{uriTemplate});
assertThat(response.cols().length, is(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package io.crate.integrationtests;

import io.crate.action.sql.SQLActionException;
import io.crate.action.sql.SQLBulkResponse;
import io.crate.test.integration.CrateIntegrationTest;
import io.crate.testing.TestingHelpers;
import org.elasticsearch.action.get.GetResponse;
Expand Down Expand Up @@ -684,4 +685,25 @@ public void testInsertFromSubQueryPartitionedTableCustomSchema() throws Exceptio
"destination| custom| 04332d1n64pg| {zipcode=14713}| 5| 0\n"));

}

@Test
public void testBulkInsert() throws Exception {
execute("create table giveittome (" +
" date timestamp," +
" dirty_names array(string)," +
" lashes short primary key" +
") with (number_of_replicas=0)");
ensureYellow();
int bulkSize = randomIntBetween(1, 1000);
Object[][] bulkArgs = new Object[bulkSize][];
for (int i = 0; i < bulkSize; i++) {
bulkArgs[i] = new Object[] { System.currentTimeMillis() + i, new String[]{randomAsciiOfLength(5), randomAsciiOfLength(2)}, (short)i };
}
SQLBulkResponse bulkResponse = execute("insert into giveittome (date, dirty_names, lashes) values (?, ?, ?)", bulkArgs);
assertThat(bulkResponse.results().length, is(bulkSize));
execute("refresh table giveittome");
// assert that bulk insert has inserted everything it said it has
execute("select sum(lashes), date from giveittome group by date");
assertThat(response.rowCount(), is((long)bulkSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.Test;
import org.mockito.*;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
Expand Down Expand Up @@ -202,10 +203,18 @@ public void run() {
assertTrue(hadBlocked.get());
} finally {
scheduledExecutorService.shutdownNow();
bulkShardProcessor.close();
forceClose(bulkShardProcessor);
}
}

private void forceClose(BulkShardProcessor bulkShardProcessor) throws Exception {
bulkShardProcessor.close();
Method stopExecutorMethod = BulkShardProcessor.class.
getDeclaredMethod("stopExecutor");
stopExecutorMethod.setAccessible(true);
stopExecutorMethod.invoke(bulkShardProcessor);
}

private void mockShard(OperationRouting operationRouting, Integer shardId) {
ShardIterator shardIterator = mock(ShardIterator.class);
when(operationRouting.indexShards(
Expand Down

0 comments on commit 819b74e

Please sign in to comment.