Skip to content

Commit

Permalink
Add option to disable query to system.prepared_statements
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Jun 26, 2017
1 parent ab64cba commit 6e4d7df
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
Expand Up @@ -54,6 +54,7 @@ public enum CoreDriverOption implements DriverOption {

PREPARE_ON_ALL_NODES("prepared-statements.prepare-on-all-nodes", true),
REPREPARE_ENABLED("prepared-statements.reprepare-on-up.enabled", true),
REPREPARE_CHECK_SYSTEM_TABLE("prepared-statements.reprepare-on-up.check-system-table", false),
REPREPARE_MAX_STATEMENTS("prepared-statements.reprepare-on-up.max-statements", false),
REPREPARE_MAX_PARALLELISM("prepared-statements.reprepare-on-up.max-parallelism", false),
REPREPARE_TIMEOUT("prepared-statements.reprepare-on-up.timeout", false),
Expand Down
Expand Up @@ -60,6 +60,7 @@ class ReprepareOnUp {
private final DriverChannel channel;
private final Map<ByteBuffer, RepreparePayload> repreparePayloads;
private final Runnable whenPrepared;
private final boolean checkSystemTable;
private final int maxStatements;
private final int maxParallelism;
private final Duration timeout;
Expand All @@ -82,15 +83,15 @@ class ReprepareOnUp {
this.repreparePayloads = repreparePayloads;
this.whenPrepared = whenPrepared;

this.checkSystemTable =
config.defaultProfile().getBoolean(CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE);
this.timeout = config.defaultProfile().getDuration(CoreDriverOption.REPREPARE_TIMEOUT);
this.maxStatements = config.defaultProfile().getInt(CoreDriverOption.REPREPARE_MAX_STATEMENTS);
this.maxParallelism =
config.defaultProfile().getInt(CoreDriverOption.REPREPARE_MAX_PARALLELISM);
}

void start() {
LOG.debug("[{}] Repreparing statements on newly added/up node", logPrefix);

if (repreparePayloads.isEmpty()) {
LOG.debug("[{}] No statements to reprepare, done", logPrefix);
whenPrepared.run();
Expand All @@ -99,8 +100,24 @@ void start() {
LOG.debug("[{}] No channel available to reprepare, done", logPrefix);
whenPrepared.run();
} else {
queryAsync(QUERY_SERVER_IDS, Collections.emptyMap(), "QUERY system.prepared_statements")
.whenComplete(this::gatherServerIds);
if (LOG.isDebugEnabled()) { // check because ConcurrentMap.size is not a constant operation
LOG.debug(
"[{}] {} statements to reprepare on newly added/up node",
logPrefix,
repreparePayloads.size());
}
if (checkSystemTable) {
LOG.debug("[{}] Checking which statements the server knows about", logPrefix);
queryAsync(QUERY_SERVER_IDS, Collections.emptyMap(), "QUERY system.prepared_statements")
.whenComplete(this::gatherServerIds);
} else {
LOG.debug(
"[{}] {} is disabled, repreparing directly",
logPrefix,
CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE.getPath());
serverKnownIds = Collections.emptySet();
gatherPayloadsToReprepare();
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions core/src/main/resources/reference.conf
Expand Up @@ -166,10 +166,6 @@ datastax-java-driver {

# How the driver replicates prepared statements on a node that just came back up or joined the
# cluster.
# Note that, since CASSANDRA-8831 (merged in 3.10), nodes keep track of their prepared
# statements in a system table, and reprepare them when they restart. The driver checks if that
# table exists and doesn't try to reprepare statements that are already known.
# This feature is still useful for added nodes.
reprepare-on-up {
# Whether the driver tries to prepare on new nodes at all.
#
Expand All @@ -184,6 +180,14 @@ datastax-java-driver {
# performance penalty (one extra roundtrip to resend the query to prepare, and another to
# retry the execution).
enabled = true
# Whether to check `system.prepared_statements` on the target node before repreparing.
#
# This table exists since CASSANDRA-8831 (merged in 3.10). It stores the statements already
# prepared on the node, and preserves them across restarts.
#
# Checking the table first avoids repreparing unnecessarily, but the cost of the query is not
# always worth the improvement, especially if the number of statements is low.
check-system-table = false
# The maximum number of statements that should be reprepared. 0 or a negative value means no
# limit.
max-statements = 0
Expand Down
Expand Up @@ -69,6 +69,8 @@ public void setup() {
Mockito.when(eventLoop.inEventLoop()).thenReturn(true);

Mockito.when(config.defaultProfile()).thenReturn(defaultConfigProfile);
Mockito.when(defaultConfigProfile.getBoolean(CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE))
.thenReturn(true);
Mockito.when(defaultConfigProfile.getDuration(CoreDriverOption.REPREPARE_TIMEOUT))
.thenReturn(Duration.ofMillis(500));
Mockito.when(defaultConfigProfile.getInt(CoreDriverOption.REPREPARE_MAX_STATEMENTS))
Expand All @@ -83,9 +85,8 @@ public void setup() {
@Test
public void should_complete_immediately_if_no_prepared_statements() {
// Given
Map<ByteBuffer, RepreparePayload> repreparePayloads = Collections.emptyMap();
MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, repreparePayloads, config, whenPrepared);
new MockReprepareOnUp("test", pool, getMockPayloads(/*none*/ ), config, whenPrepared);

// When
reprepareOnUp.start();
Expand All @@ -99,7 +100,7 @@ public void should_complete_immediately_if_pool_empty() {
// Given
Mockito.when(pool.next()).thenReturn(null);
MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp("test", pool, getMockPayloads('a'), config, whenPrepared);

// When
reprepareOnUp.start();
Expand All @@ -111,7 +112,8 @@ public void should_complete_immediately_if_pool_empty() {
@Test
public void should_reprepare_all_if_system_table_query_fails() {
MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

Expand All @@ -134,7 +136,8 @@ public void should_reprepare_all_if_system_table_query_fails() {
@Test
public void should_reprepare_all_if_system_table_empty() {
MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

Expand All @@ -156,10 +159,33 @@ public void should_reprepare_all_if_system_table_empty() {
assertThat(done).isSuccess(v -> assertThat(reprepareOnUp.queries).isEmpty());
}

@Test
public void should_reprepare_all_if_system_query_disabled() {
Mockito.when(defaultConfigProfile.getBoolean(CoreDriverOption.REPREPARE_CHECK_SYSTEM_TABLE))
.thenReturn(false);

MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

MockAdminQuery adminQuery;
for (char c = 'a'; c <= 'f'; c++) {
adminQuery = reprepareOnUp.queries.poll();
assertThat(adminQuery.request).isInstanceOf(Prepare.class);
assertThat(((Prepare) adminQuery.request).cqlQuery).isEqualTo("mock query " + c);
adminQuery.resultFuture.complete(null);
}

assertThat(done).isSuccess(v -> assertThat(reprepareOnUp.queries).isEmpty());
}

@Test
public void should_not_reprepare_already_known_statements() {
MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

Expand Down Expand Up @@ -187,7 +213,8 @@ public void should_limit_number_of_statements_to_reprepare() {
.thenReturn(3);

MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

Expand Down Expand Up @@ -215,7 +242,8 @@ public void should_limit_number_of_statements_reprepared_in_parallel() {
.thenReturn(3);

MockReprepareOnUp reprepareOnUp =
new MockReprepareOnUp("test", pool, getMockPayloads(), config, whenPrepared);
new MockReprepareOnUp(
"test", pool, getMockPayloads('a', 'b', 'c', 'd', 'e', 'f'), config, whenPrepared);

reprepareOnUp.start();

Expand Down Expand Up @@ -250,11 +278,12 @@ public void should_limit_number_of_statements_reprepared_in_parallel() {
assertThat(done).isSuccess(v -> assertThat(reprepareOnUp.queries).isEmpty());
}

private Map<ByteBuffer, RepreparePayload> getMockPayloads() {
private Map<ByteBuffer, RepreparePayload> getMockPayloads(char... values) {
ImmutableMap.Builder<ByteBuffer, RepreparePayload> builder = ImmutableMap.builder();
for (char c = 'a'; c <= 'f'; c++) {
ByteBuffer id = Bytes.fromHexString("0x0" + c);
builder.put(id, new RepreparePayload(id, "mock query " + c, null, Collections.emptyMap()));
for (char value : values) {
ByteBuffer id = Bytes.fromHexString("0x0" + value);
builder.put(
id, new RepreparePayload(id, "mock query " + value, null, Collections.emptyMap()));
}
return builder.build();
}
Expand Down

0 comments on commit 6e4d7df

Please sign in to comment.