Skip to content

Commit

Permalink
MINOR: improve logging for FK-join (#14105)
Browse files Browse the repository at this point in the history
Reviewers: Colt McNealy <colt@littlehorse.io>, Walker Carlson <wcarlson@confluent.io>
  • Loading branch information
mjsax committed Aug 5, 2023
1 parent b3db905 commit faf3635
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Supplier;

Expand All @@ -43,6 +45,7 @@
* @param <VR> Type of joined result of primary and foreign values
*/
public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
Expand Down Expand Up @@ -107,6 +110,8 @@ public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
result = joiner.apply(currentValueWithTimestamp == null ? null : currentValueWithTimestamp.value(), record.value().getForeignValue());
}
context().forward(record.withValue(result));
} else {
LOG.trace("Dropping FK-join response due to hash mismatch. Expected {}. Actual {}", messageHash, currentHash);
}
}
};
Expand Down

0 comments on commit faf3635

Please sign in to comment.