Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: always use the changelog subject in table state stores #5837

Merged
merged 1 commit into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -41,16 +42,78 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {

@Override
public T deserialize(final String topic, final byte[] bytes) {
return tryDeserialize(topic, bytes).get();
}

/**
* Similar to {@link #deserialize(String, byte[])}, but allows the erroneous case
* to delay the log-and-throw behavior until {@link DelayedResult#get()} is called.
*
* <p>This can be used in the scenarios when an error is expected, such as if a retry
* will likely solve the problem, to avoid spamming the processing logger with messages
* that are not helpful to the end user.</p>
*/
public DelayedResult<T> tryDeserialize(final String topic, final byte[] bytes) {
try {
return delegate.deserialize(topic, bytes);
return new DelayedResult<T>(delegate.deserialize(topic, bytes));
} catch (final RuntimeException e) {
processingLogger.error(new DeserializationError(e, Optional.ofNullable(bytes), topic));
throw e;
return new DelayedResult<T>(
e,
new DeserializationError(e, Optional.ofNullable(bytes), topic),
processingLogger
);
}
}

@Override
public void close() {
delegate.close();
}

public static class DelayedResult<T> {

private final T result;
private final RuntimeException error;
private final ProcessingLogger processingLogger;
private final DeserializationError deserializationError;

public DelayedResult(
final RuntimeException error,
final DeserializationError deserializationError,
final ProcessingLogger processingLogger
) {
this.result = null;
this.error = error;
this.deserializationError = requireNonNull(deserializationError, "deserializationError");
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
}

public DelayedResult(final T result) {
this.result = result;
this.error = null;
this.deserializationError = null;
this.processingLogger = null;
}

public boolean isError() {
return error != null;
}

@VisibleForTesting
RuntimeException getError() {
return error;
}

@SuppressWarnings("ConstantConditions")
public T get() {
if (isError()) {
processingLogger.error(deserializationError);
throw error;
}

return result;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.serde;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.logging.processing.LoggingDeserializer;
import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* The {@code StaticTopicSerde} hard codes the topic name that is passed
* to the delegate Serde, regardless to what the caller passes in as the
* topic. The only exception is that if a deserialization attempt fails,
* the deserializer will attempt one more time using the topic that was
* passed to the Serde (instead of the hard coded value). In this situation,
* the {@code onFailure} callback is called so that the user of this class
* can remedy the issue (i.e. register an extra schema under the hard coded
* topic). The callback will not be called if both serialization attempts fail.
*
* <p>This class is intended as a workaround for the issues described in
* both KAFKA-10179 and KSQL-5673; specifically, it allows a materialized
* state store to use a different topic name than that which Kafka Streams
* passes in to the Serde.</p>
*
* <p><b>Think carefully before reusing this class! It's inteded use case is
* very narrow.</b></p>
*/
public final class StaticTopicSerde<T> implements Serde<T> {

public interface Callback {

/**
* This method is called when the {@link Serde#deserializer()}'s produced by
* this class' {@link Deserializer#deserialize(String, byte[])} method fails
* using the static topic but succeeds using the source topic.
*
* @param sourceTopic the original topic that was passed in to the deserializer
* @param staticTopic the hard coded topic that was passed into the {@code StaticTopicSerde}
* @param data the data that failed deserialization
*/
void onDeserializationFailure(String sourceTopic, String staticTopic, byte[] data);
}

private final Serde<T> delegate;
private final String topic;
private final Callback onFailure;

/**
* @param topic the topic to hardcode
* @param serde the delegate serde
* @param onFailure a callback to call on failure
*
* @return a serde which delegates to {@code serde} but passes along {@code topic}
* in place of whatever the actual topic is
*/
public static <S> Serde<S> wrap(
final String topic,
final Serde<S> serde,
final Callback onFailure
) {
return new StaticTopicSerde<>(topic, serde, onFailure);
}

private StaticTopicSerde(
final String topic,
final Serde<T> delegate,
final Callback onFailure
) {
this.topic = Objects.requireNonNull(topic, "topic");
this.delegate = Objects.requireNonNull(delegate, "delegate");
this.onFailure = Objects.requireNonNull(onFailure, "onFailure");
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
delegate.configure(configs, isKey);
}

@Override
public void close() {
delegate.close();
}

@Override
public Serializer<T> serializer() {
final Serializer<T> serializer = delegate.serializer();
return (topic, data) -> serializer.serialize(this.topic, data);
}

@Override
public Deserializer<T> deserializer() {
final Deserializer<T> deserializer = delegate.deserializer();

if (deserializer instanceof LoggingDeserializer<?>) {
final LoggingDeserializer<T> loggingDeserializer = (LoggingDeserializer<T>) deserializer;

return (topic, data) -> {
final DelayedResult<T> staticResult = loggingDeserializer.tryDeserialize(this.topic, data);
if (!staticResult.isError()) {
return staticResult.get();
}

// if both attempts error, then staticResult.get() will log the error to
// the processing log and throw - do not call the callback in this case
final DelayedResult<T> sourceResult = loggingDeserializer.tryDeserialize(topic, data);
if (sourceResult.isError()) {
return staticResult.get();
}

onFailure.onDeserializationFailure(topic, this.topic, data);
return sourceResult.get();
};
}

return (topic, data) -> {
try {
return deserializer.deserialize(this.topic, data);
} catch (final Exception e) {
final T object = deserializer.deserialize(topic, data);
onFailure.onDeserializationFailure(topic, this.topic, data);
return object;
}
};
}

@VisibleForTesting
public String getTopic() {
return topic;
}

@VisibleForTesting
public Callback getOnFailure() {
return onFailure;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,25 @@
package io.confluent.ksql.logging.processing;

import static io.confluent.ksql.GenericRow.genericRow;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableMap;
import com.google.common.testing.NullPointerTester;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.logging.processing.LoggingDeserializer.DelayedResult;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -99,6 +104,20 @@ public void shouldDeserializeWithDelegate() {
verify(delegate).deserialize("some topic", SOME_BYTES);
}

@Test
public void shouldTryDeserializeWithDelegate() {
// Given:
when(delegate.deserialize(any(), any())).thenReturn(SOME_ROW);

// When:
final DelayedResult<GenericRow> result = deserializer.tryDeserialize("some topic", SOME_BYTES);

// Then:
verify(delegate).deserialize("some topic", SOME_BYTES);
assertThat(result.isError(), is(false));
assertThat(result.get(), is(SOME_ROW));
}

@Test(expected = ArithmeticException.class)
public void shouldThrowIfDelegateThrows() {
// Given:
Expand Down Expand Up @@ -126,4 +145,21 @@ public void shouldLogOnException() {
// Then:
verify(processingLogger).error(new DeserializationError(e, Optional.of(SOME_BYTES), "t"));
}

@Test
public void shouldDelayLogOnException() {
// Given:
when(delegate.deserialize(any(), any()))
.thenThrow(new RuntimeException("outer",
new RuntimeException("inner", new RuntimeException("cause"))));

// When:
final DelayedResult<GenericRow> result = deserializer.tryDeserialize("t", SOME_BYTES);

// Then:
assertTrue(result.isError());
assertThrows(RuntimeException.class, result::get);
verify(processingLogger)
.error(new DeserializationError(result.getError(), Optional.of(SOME_BYTES), "t"));
}
}
Loading