Skip to content

Commit

Permalink
feat: move command topic deserialization to CommandRunner and introdu…
Browse files Browse the repository at this point in the history
…ce DEGRADED CommandRunnerStatus (#6012)

* feat: move deserialization to CommandRunner and introduce DEGRADED to CommandRunnerStatus

* rohan comment

* fix test

* more feedback

* pass deserializer to queuedcommand for command
  • Loading branch information
stevenpyzhang committed Aug 20, 2020
1 parent a6c3864 commit ab8cec2
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
package io.confluent.ksql.rest.server;

import com.google.common.collect.Lists;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.computation.QueuedCommand;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -31,6 +28,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,7 +37,7 @@ public class CommandTopic {
private static final Logger log = LoggerFactory.getLogger(CommandTopic.class);
private final TopicPartition commandTopicPartition;

private Consumer<CommandId, Command> commandConsumer = null;
private Consumer<byte[], byte[]> commandConsumer;
private final String commandTopicName;
private CommandTopicBackup commandTopicBackup;

Expand All @@ -52,16 +50,16 @@ public CommandTopic(
commandTopicName,
new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"),
InternalTopicSerdes.deserializer(CommandId.class),
InternalTopicSerdes.deserializer(Command.class)
new ByteArrayDeserializer(),
new ByteArrayDeserializer()
),
commandTopicBackup
);
}

CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Consumer<byte[], byte[]> commandConsumer,
final CommandTopicBackup commandTopicBackup
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
Expand All @@ -79,11 +77,11 @@ public void start() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
final Iterable<ConsumerRecord<CommandId, Command>> iterable = commandConsumer.poll(timeout);
public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration timeout) {
final Iterable<ConsumerRecord<byte[], byte[]>> iterable = commandConsumer.poll(timeout);

if (iterable != null) {
iterable.forEach(record -> backupRecord(record));
iterable.forEach(this::backupRecord);
}

return iterable;
Expand All @@ -96,11 +94,11 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
Collections.singletonList(commandTopicPartition));

log.debug("Reading prior command records");
ConsumerRecords<CommandId, Command> records =
ConsumerRecords<byte[], byte[]> records =
commandConsumer.poll(duration);
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
for (final ConsumerRecord<CommandId, Command> record : records) {
for (final ConsumerRecord<byte[], byte[]> record : records) {
backupRecord(record);

if (record.value() == null) {
Expand Down Expand Up @@ -136,7 +134,7 @@ public void close() {
commandTopicBackup.close();
}

private void backupRecord(final ConsumerRecord<CommandId, Command> record) {
private void backupRecord(final ConsumerRecord<byte[], byte[]> record) {
commandTopicBackup.writeRecord(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@

package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public interface CommandTopicBackup {
void initialize();

void writeRecord(ConsumerRecord<CommandId, Command> record);
void writeRecord(ConsumerRecord<byte[], byte[]> record);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Ticker;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -125,7 +126,27 @@ private boolean isRecordInLatestReplay(final ConsumerRecord<CommandId, Command>
}

@Override
public void writeRecord(final ConsumerRecord<CommandId, Command> record) {
public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
final ConsumerRecord<CommandId, Command> deserializedRecord;
try {
deserializedRecord = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
InternalTopicSerdes.deserializer(CommandId.class)
.deserialize(record.topic(), record.key()),
InternalTopicSerdes.deserializer(Command.class)
.deserialize(record.topic(), record.value())
);
} catch (Exception e) {
LOG.error("Failed to deserialize command topic record when backing it up: {}:{}",
record.key(), record.value());
return;
}
writeCommandToBackup(deserializedRecord);
}

void writeCommandToBackup(final ConsumerRecord<CommandId, Command> record) {
if (isRestoring()) {
if (isRecordInLatestReplay(record)) {
// Ignore backup because record was already replayed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

package io.confluent.ksql.rest.server;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class CommandTopicBackupNoOp implements CommandTopicBackup {
Expand All @@ -26,7 +24,7 @@ public void initialize() {
}

@Override
public void writeRecord(final ConsumerRecord<CommandId, Command> record) {
public void writeRecord(final ConsumerRecord<byte[], byte[]> record) {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.server.HeartbeatAgent.Builder;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
import io.confluent.ksql.rest.server.computation.InteractiveStatementExecutor;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.rest.server.resources.ClusterStatusResource;
import io.confluent.ksql.rest.server.resources.HealthCheckResource;
Expand Down Expand Up @@ -742,7 +744,8 @@ static KsqlRestApplication buildApplication(
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(
KsqlRestConfig.KSQL_COMMAND_RUNNER_BLOCKED_THRESHHOLD_ERROR_MS)),
metricsPrefix
metricsPrefix,
InternalTopicSerdes.deserializer(Command.class)
);

final QueryMonitor queryMonitor = new QueryMonitor(ksqlConfig, ksqlEngine);
Expand Down
Loading

0 comments on commit ab8cec2

Please sign in to comment.