Skip to content

Commit

Permalink
feat: CommandRunner enters degraded states when it processes command …
Browse files Browse the repository at this point in the history
…with higher version than it supports
  • Loading branch information
stevenpyzhang committed Aug 18, 2020
1 parent b668a51 commit 55be881
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 11 deletions.
Expand Up @@ -24,8 +24,8 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -95,10 +95,11 @@ public Command(
this.plan = requireNonNull(plan, "plan");
this.version = requireNonNull(version, "version");

if (expectedVersion != version.orElse(0)) {
throw new KsqlException(
if (expectedVersion < version.orElse(0)) {
throw new IncomaptibleKsqlCommandVersionException(
"Received a command from an incompatible command topic version. "
+ "Expected " + expectedVersion + " but got " + version.orElse(0));
+ "Expected version less than or equal to " + expectedVersion
+ " but got " + version.orElse(0));
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class CommandRunner implements Closeable {
private final Clock clock;

private final Consumer<QueuedCommand> incompatibleCommandChecker;
private boolean deserializationErrorThrown;
private boolean incompatibleCommandDetected;

public enum CommandRunnerStatus {
RUNNING,
Expand Down Expand Up @@ -144,7 +145,7 @@ public CommandRunner(
this.compactor = Objects.requireNonNull(compactor, "compactor");
this.incompatibleCommandChecker =
Objects.requireNonNull(incompatibleCommandChecker, "incompatibleCommandChecker");
this.deserializationErrorThrown = false;
this.incompatibleCommandDetected = false;
}

/**
Expand Down Expand Up @@ -298,7 +299,7 @@ private void terminateCluster(final Command command) {
}

public CommandRunnerStatus checkCommandRunnerStatus() {
if (deserializationErrorThrown) {
if (incompatibleCommandDetected) {
return CommandRunnerStatus.DEGRADED;
}

Expand All @@ -322,9 +323,9 @@ private List<QueuedCommand> checkForIncompatibleCommands(final List<QueuedComman
incompatibleCommandChecker.accept(command);
compatibleCommands.add(command);
}
} catch (SerializationException e) {
LOG.info("Deserialization error detected when processing record", e);
deserializationErrorThrown = true;
} catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) {
LOG.info("Incompatible command record detected when processing command topic", e);
incompatibleCommandDetected = true;
}
return compatibleCommands;
}
Expand All @@ -339,7 +340,7 @@ private class Runner implements Runnable {
public void run() {
try {
while (!closed) {
if (deserializationErrorThrown) {
if (incompatibleCommandDetected) {
LOG.warn("CommandRunner entering degraded state after failing to deserialize command");
closeEarly();
} else {
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.util.KsqlServerException;

public class IncomaptibleKsqlCommandVersionException extends KsqlServerException {

public IncomaptibleKsqlCommandVersionException(final String message) {
super(message);
}

}
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -248,6 +249,42 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInFetch(
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
}

@Test
public void shouldProcessPartialListOfCommandsOnIncomaptibleCommandInRestore() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.processPriorCommands();

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldProcessPartialListOfCommandsOnIncomaptibleCommandInFetch() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.processPriorCommands();

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldPullAndRunStatements() {
// Given:
Expand Down
Expand Up @@ -18,16 +18,22 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import org.junit.Test;

public class CommandTest {

@Test
public void shouldDeserializeCorrectly() throws IOException {
final String commandStr = "{" +
Expand All @@ -46,6 +52,23 @@ public void shouldDeserializeCorrectly() throws IOException {
assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties));
}

@Test
public void shouldThrowExceptionWhenCommandVersionHigher() {
final String commandStr = "{" +
"\"statement\": \"test statement;\", " +
"\"streamsProperties\": {\"foo\": \"bar\"}, " +
"\"originalProperties\": {\"biz\": \"baz\"}, " +
"\"version\": " + (Command.VERSION + 1) +
"}";
final ObjectMapper mapper = PlanJsonMapper.INSTANCE.get();
final ValueInstantiationException thrown = assertThrows(
"Expected deserialization to throw, but it didn't",
ValueInstantiationException.class,
() -> mapper.readValue(commandStr, Command.class)
);
assertTrue(thrown.getCause() instanceof IncomaptibleKsqlCommandVersionException);
}

@Test
public void shouldDeserializeCorrectlyWithVersion() throws IOException {
final String commandStr = "{" +
Expand Down

0 comments on commit 55be881

Please sign in to comment.