Skip to content

Commit

Permalink
fix: always use the changelog subject in table state stores (#5823)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jul 16, 2020
1 parent bc5e5fd commit e69acb4
Show file tree
Hide file tree
Showing 8 changed files with 650 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -41,16 +42,78 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {

@Override
public T deserialize(final String topic, final byte[] bytes) {
return tryDeserialize(topic, bytes).get();
}

/**
* Similar to {@link #deserialize(String, byte[])}, but allows the erroneous case
* to delay the log-and-throw behavior until {@link DelayedResult#get()} is called.
*
* <p>This can be used in the scenarios when an error is expected, such as if a retry
* will likely solve the problem, to avoid spamming the processing logger with messages
* that are not helpful to the end user.</p>
*/
public DelayedResult<T> tryDeserialize(final String topic, final byte[] bytes) {
try {
return delegate.deserialize(topic, bytes);
return new DelayedResult<T>(delegate.deserialize(topic, bytes));
} catch (final RuntimeException e) {
processingLogger.error(new DeserializationError(e, Optional.ofNullable(bytes), topic));
throw e;
return new DelayedResult<T>(
e,
new DeserializationError(e, Optional.ofNullable(bytes), topic),
processingLogger
);
}
}

@Override
public void close() {
delegate.close();
}

public static class DelayedResult<T> {

private final T result;
private final RuntimeException error;
private final ProcessingLogger processingLogger;
private final DeserializationError deserializationError;

public DelayedResult(
final RuntimeException error,
final DeserializationError deserializationError,
final ProcessingLogger processingLogger
) {
this.result = null;
this.error = error;
this.deserializationError = requireNonNull(deserializationError, "deserializationError");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
}

public DelayedResult(final T result) {
this.result = result;
this.error = null;
this.deserializationError = null;
this.processingLogger = null;
}

public boolean isError() {
return error != null;
}

@VisibleForTesting
RuntimeException getError() {
return error;
}

@SuppressWarnings("ConstantConditions")
public T get() {
if (isError()) {
processingLogger.error(deserializationError);
throw error;
}

return result;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* The {@code StaticTopicSerde} hard codes the topic name that is passed
* to the delegate Serde, regardless to what the caller passes in as the
* topic. The only exception is that if a deserialization attempt fails,
* the deserializer will attempt one more time using the topic that was
* passed to the Serde (instead of the hard coded value). In this situation,
* the {@code onFailure} callback is called so that the user of this class
* can remedy the issue (i.e. register an extra schema under the hard coded
* topic). The callback will not be called if both serialization attempts fail.
*
* <p>This class is intended as a workaround for the issues described in
* both KAFKA-10179 and KSQL-5673; specifically, it allows a materialized
* state store to use a different topic name than that which Kafka Streams
* passes in to the Serde.</p>
*
* <p><b>Think carefully before reusing this class! It's inteded use case is
* very narrow.</b></p>
*/
public final class StaticTopicSerde<T> implements Serde<T> {

public interface Callback {

/**
* This method is called when the {@link Serde#deserializer()}'s produced by
* this class' {@link Deserializer#deserialize(String, byte[])} method fails
* using the static topic but succeeds using the source topic.
*
* @param sourceTopic the original topic that was passed in to the deserializer
* @param staticTopic the hard coded topic that was passed into the {@code StaticTopicSerde}
* @param data the data that failed deserialization
*/
void onDeserializationFailure(String sourceTopic, String staticTopic, byte[] data);
}

private final Serde<T> delegate;
private final String topic;
private final Callback onFailure;

/**
* @param topic the topic to hardcode
* @param serde the delegate serde
* @param onFailure a callback to call on failure
*
* @return a serde which delegates to {@code serde} but passes along {@code topic}
* in place of whatever the actual topic is
*/
public static <S> Serde<S> wrap(
final String topic,
final Serde<S> serde,
final Callback onFailure
) {
return new StaticTopicSerde<>(topic, serde, onFailure);
}

private StaticTopicSerde(
final String topic,
final Serde<T> delegate,
final Callback onFailure
) {
this.topic = Objects.requireNonNull(topic, "topic");
this.delegate = Objects.requireNonNull(delegate, "delegate");
this.onFailure = Objects.requireNonNull(onFailure, "onFailure");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
}

@Override
public void close() {
delegate.close();
}

@Override
public Serializer<T> serializer() {
final Serializer<T> serializer = delegate.serializer();
return (topic, data) -> serializer.serialize(this.topic, data);
}

@Override
public Deserializer<T> deserializer() {
final Deserializer<T> deserializer = delegate.deserializer();

if (deserializer instanceof LoggingDeserializer<?>) {
final LoggingDeserializer<T> loggingDeserializer = (LoggingDeserializer<T>) deserializer;

return (topic, data) -> {
final DelayedResult<T> staticResult = loggingDeserializer.tryDeserialize(this.topic, data);
if (!staticResult.isError()) {
return staticResult.get();
}

// if both attempts error, then staticResult.get() will log the error to
// the processing log and throw - do not call the callback in this case
final DelayedResult<T> sourceResult = loggingDeserializer.tryDeserialize(topic, data);
if (sourceResult.isError()) {
return staticResult.get();
}

onFailure.onDeserializationFailure(topic, this.topic, data);
return sourceResult.get();
};
}

return (topic, data) -> {
try {
return deserializer.deserialize(this.topic, data);
} catch (final Exception e) {
final T object = deserializer.deserialize(topic, data);
onFailure.onDeserializationFailure(topic, this.topic, data);
return object;
}
};
}

@VisibleForTesting
public String getTopic() {
return topic;
}

@VisibleForTesting
public Callback getOnFailure() {
return onFailure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@
package io.confluent.ksql.logging.processing;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.NullPointerTester;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -99,6 +104,20 @@ public void shouldDeserializeWithDelegate() {
verify(delegate).deserialize("some topic", SOME_BYTES);
}

@Test
public void shouldTryDeserializeWithDelegate() {
// Given:
when(delegate.deserialize(any(), any())).thenReturn(SOME_ROW);

// When:
final DelayedResult<GenericRow> result = deserializer.tryDeserialize("some topic", SOME_BYTES);

// Then:
verify(delegate).deserialize("some topic", SOME_BYTES);
assertThat(result.isError(), is(false));
assertThat(result.get(), is(SOME_ROW));
}

@Test(expected = ArithmeticException.class)
public void shouldThrowIfDelegateThrows() {
// Given:
Expand Down Expand Up @@ -126,4 +145,21 @@ public void shouldLogOnException() {
// Then:
verify(processingLogger).error(new DeserializationError(e, Optional.of(SOME_BYTES), "t"));
}

@Test
public void shouldDelayLogOnException() {
// Given:
when(delegate.deserialize(any(), any()))
.thenThrow(new RuntimeException("outer",
new RuntimeException("inner", new RuntimeException("cause"))));

// When:
final DelayedResult<GenericRow> result = deserializer.tryDeserialize("t", SOME_BYTES);

// Then:
assertTrue(result.isError());
assertThrows(RuntimeException.class, result::get);
verify(processingLogger)
.error(new DeserializationError(result.getError(), Optional.of(SOME_BYTES), "t"));
}
}
Loading

0 comments on commit e69acb4

Please sign in to comment.