Skip to content

Commit

Permalink
spring-cloudGH-853: Don't propagate out "internal" headers
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Feb 27, 2020
1 parent 6091a1d commit 0100e61
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -23,3 +23,4 @@ _site/
dump.rdb
.apt_generated
artifacts
.sts4-cache
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,6 +38,8 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.lang.Nullable;
Expand All @@ -49,7 +52,7 @@
* Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}
* from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x
* and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly.
* Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release.
* It also suppresses certain internal headers that should never be propagated on output.
*
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages.
* The exceptions are correlation and reply headers for request/reply
Expand All @@ -65,6 +68,16 @@
*/
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {

private static final String NEGATE = "!";

private static final String NEVER_ID = NEGATE + MessageHeaders.ID;

private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP;

private static final String NEVER_DELIVERY_ATTEMPTS = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT;

private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT;

private static final String JAVA_LANG_STRING = "java.lang.String";

private static final List<String> DEFAULT_TRUSTED_PACKAGES =
Expand Down Expand Up @@ -119,8 +132,10 @@ public BinderHeaderMapper() {
*/
public BinderHeaderMapper(ObjectMapper objectMapper) {
this(objectMapper,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
NEVER_ID,
NEVER_TIMESTAMP,
NEVER_DELIVERY_ATTEMPTS,
NEVER_NATIVE_HEADERS_PRESENT,
"*");
}

Expand Down Expand Up @@ -384,6 +399,32 @@ protected boolean trusted(String requestedType) {
return true;
}

/**
* Add patterns for headers that should never be mapped.
* @param patterns the patterns.
* @return the modified patterns.
* @since 3.0.2
*/
public static String[] addNeverHeaderPatterns(List<String> patterns) {
List<String> patternsToUse = new LinkedList<>(patterns);
patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT);
patternsToUse.add(0, NEVER_DELIVERY_ATTEMPTS);
patternsToUse.add(0, NEVER_TIMESTAMP);
patternsToUse.add(0, NEVER_ID);
return patternsToUse.toArray(new String[0]);
}

/**
* Remove never headers.
* @param headers the headers from which to remove the never headers.
* @since 3.0.2
*/
public static void removeNeverHeaders(Headers headers) {
headers.remove(MessageHeaders.ID);
headers.remove(MessageHeaders.TIMESTAMP);
headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT);
}

/**
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -151,6 +150,8 @@ public class KafkaMessageChannelBinder extends
implements
ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {

private static final String NEGATE = "!";

/**
* Kafka header for x-exception-fqcn.
*/
Expand Down Expand Up @@ -421,23 +422,32 @@ protected MessageHandler createProducerMessageHandler(
mapper = null;
}
else if (mapper == null) {
String[] headerPatterns = producerProperties.getExtension()
.getHeaderPatterns();
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
mapper = new BinderHeaderMapper(
patterns.toArray(new String[patterns.size()]));
BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
}
else {
mapper = new BinderHeaderMapper();
}
}
else {
KafkaHeaderMapper userHeaderMapper = mapper;
mapper = new KafkaHeaderMapper() {

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
userHeaderMapper.toHeaders(source, target);
}

@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
userHeaderMapper.fromHeaders(headers, target);
BinderHeaderMapper.removeNeverHeaders(target);
}
};

}
handler.setHeaderMapper(mapper);
return handler;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -51,7 +52,9 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
Expand Down Expand Up @@ -3508,6 +3511,98 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
}
}

@Test
public void testInternalHeadersNotPropagated() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
}

@Test
public void testInternalHeadersNotPropagatedCustomHeader() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
}

@Test
public void testInternalHeadersNotPropagatedCustomMapper() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
}

public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
KafkaHeaderMapper mapper) throws Exception {

KafkaTestBinder binder;
if (mapper == null) {
binder = getBinder();
}
else {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaderMapperBeanName("headerMapper");

KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException(e);
}
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
KafkaHeaderMapper.class, () -> mapper);
}
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setHeaderPatterns(headerPatterns);

DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName(name + ".out");
Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);

QueueChannel input = new QueueChannel();
input.setBeanName(name + ".in");
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send(MessageBuilder.withPayload("internalHeaderPropagation")
.setHeader(KafkaHeaders.TOPIC, name + ".0")
.setHeader("someHeader", "someValue")
.build());

Message<?> consumed = input.receive(10_000);
if (headerPatterns != null) {
consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
}
output.send(consumed);

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer consumer = cf.createConsumer();
consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<?, ?> received = records.iterator().next();
assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
assertThat(header).isNull();
header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.ID);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
assertThat(header).isNull();
assertThat(received.headers().lastHeader("someHeader")).isNotNull();
if (headerPatterns != null) {
assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
}

producerBinding.unbind();
consumerBinding.unbind();
consumer.close();
}

private final class FailingInvocationCountingMessageHandler
implements MessageHandler {

Expand Down

0 comments on commit 0100e61

Please sign in to comment.