Skip to content

Commit

Permalink
feat: CommandRunner enters degraded states when it processes command …
Browse files Browse the repository at this point in the history
…with higher version than it supports (#6032)

* feat: move deserialization to CommandRunner and introduce DEGRADED to CommandRunnerStatus

* rohan comment

* fix test

* more feedback

* pass deserializer to queuedcommand for command

* chore: refactor DistributingExecutor to use CommandRunner instead of CommandQueue

* rohan comment

* feat: CommandRunner enters degraded states when it processes command with higher version than it supports
  • Loading branch information
stevenpyzhang committed Aug 20, 2020
1 parent a036b8b commit a841443
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 11 deletions.
Expand Up @@ -24,8 +24,8 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -95,10 +95,11 @@ public Command(
this.plan = requireNonNull(plan, "plan");
this.version = requireNonNull(version, "version");

if (expectedVersion != version.orElse(0)) {
throw new KsqlException(
if (expectedVersion < version.orElse(0)) {
throw new IncomaptibleKsqlCommandVersionException(
"Received a command from an incompatible command topic version. "
+ "Expected " + expectedVersion + " but got " + version.orElse(0));
+ "Expected version less than or equal to " + expectedVersion
+ " but got " + version.orElse(0));
}
}

Expand Down
Expand Up @@ -17,6 +17,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class CommandRunner implements Closeable {

private final Deserializer<Command> commandDeserializer;
private final Consumer<QueuedCommand> incompatibleCommandChecker;
private boolean deserializationErrorThrown;
private boolean incompatibleCommandDetected;

public enum CommandRunnerStatus {
RUNNING,
Expand Down Expand Up @@ -151,7 +152,7 @@ public CommandRunner(
Objects.requireNonNull(incompatibleCommandChecker, "incompatibleCommandChecker");
this.commandDeserializer =
Objects.requireNonNull(commandDeserializer, "commandDeserializer");
this.deserializationErrorThrown = false;
this.incompatibleCommandDetected = false;
}

/**
Expand Down Expand Up @@ -310,7 +311,7 @@ private void terminateCluster(final Command command) {
}

public CommandRunnerStatus checkCommandRunnerStatus() {
if (deserializationErrorThrown) {
if (incompatibleCommandDetected) {
return CommandRunnerStatus.DEGRADED;
}

Expand All @@ -334,9 +335,9 @@ private List<QueuedCommand> checkForIncompatibleCommands(final List<QueuedComman
incompatibleCommandChecker.accept(command);
compatibleCommands.add(command);
}
} catch (SerializationException e) {
LOG.info("Deserialization error detected when processing record", e);
deserializationErrorThrown = true;
} catch (final SerializationException | IncomaptibleKsqlCommandVersionException e) {
LOG.info("Incompatible command record detected when processing command topic", e);
incompatibleCommandDetected = true;
}
return compatibleCommands;
}
Expand All @@ -351,7 +352,7 @@ private class Runner implements Runnable {
public void run() {
try {
while (!closed) {
if (deserializationErrorThrown) {
if (incompatibleCommandDetected) {
LOG.warn("CommandRunner entering degraded state after failing to deserialize command");
closeEarly();
} else {
Expand Down
@@ -0,0 +1,26 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server.resources;

import io.confluent.ksql.util.KsqlServerException;

public class IncomaptibleKsqlCommandVersionException extends KsqlServerException {

public IncomaptibleKsqlCommandVersionException(final String message) {
super(message);
}

}
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.TerminateCluster;
Expand Down Expand Up @@ -252,6 +253,42 @@ public void shouldProcessPartialListOfCommandsOnDeserializationExceptionInFetch(
assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
}

@Test
public void shouldProcessPartialListOfCommandsOnIncomaptibleCommandInRestore() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.processPriorCommands();

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldProcessPartialListOfCommandsOnIncomaptibleCommandInFetch() {
// Given:
givenQueuedCommands(queuedCommand1, queuedCommand2, queuedCommand3);
doThrow(new IncomaptibleKsqlCommandVersionException("")).when(incompatibleCommandChecker).accept(queuedCommand3);

// When:
commandRunner.processPriorCommands();

// Then:
final InOrder inOrder = inOrder(statementExecutor);
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand1));
inOrder.verify(statementExecutor).handleRestore(eq(queuedCommand2));

assertThat(commandRunner.checkCommandRunnerStatus(), is(CommandRunner.CommandRunnerStatus.DEGRADED));
verify(statementExecutor, never()).handleRestore(queuedCommand3);
}

@Test
public void shouldPullAndRunStatements() {
// Given:
Expand Down
Expand Up @@ -18,16 +18,22 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import io.confluent.ksql.rest.server.resources.IncomaptibleKsqlCommandVersionException;
import org.junit.Test;

public class CommandTest {

@Test
public void shouldDeserializeCorrectly() throws IOException {
final String commandStr = "{" +
Expand All @@ -46,6 +52,23 @@ public void shouldDeserializeCorrectly() throws IOException {
assertThat(command.getOriginalProperties(), equalTo(expectedOriginalProperties));
}

@Test
public void shouldThrowExceptionWhenCommandVersionHigher() {
final String commandStr = "{" +
"\"statement\": \"test statement;\", " +
"\"streamsProperties\": {\"foo\": \"bar\"}, " +
"\"originalProperties\": {\"biz\": \"baz\"}, " +
"\"version\": " + (Command.VERSION + 1) +
"}";
final ObjectMapper mapper = PlanJsonMapper.INSTANCE.get();
final ValueInstantiationException thrown = assertThrows(
"Expected deserialization to throw, but it didn't",
ValueInstantiationException.class,
() -> mapper.readValue(commandStr, Command.class)
);
assertTrue(thrown.getCause() instanceof IncomaptibleKsqlCommandVersionException);
}

@Test
public void shouldDeserializeCorrectlyWithVersion() throws IOException {
final String commandStr = "{" +
Expand Down

0 comments on commit a841443

Please sign in to comment.