Skip to content

Commit

Permalink
INT-2971: AMPQ 'header-mapper' XSD polishing
Browse files Browse the repository at this point in the history
* Change expected type from the `HeaderMapper` to the `AmqpHeaderMapper`
* Add 'header-mapper' mutually exclusivity failing tests
* Change AMQP doc to say about `AmqpHeaderMapper` not `HeaderMapper`

JIRA: https://jira.springsource.org/browse/INT-2971
  • Loading branch information
artembilan committed Apr 1, 2013
1 parent 110957f commit d306c44
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 34 deletions.
Expand Up @@ -418,9 +418,14 @@ The order for this consumer when multiple consumers are registered thereby enabl
</xsd:attribute>
<xsd:attribute name="header-mapper" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The reference to the 'AmqpHeaderMapper' implementation bean to map AMQP Headers from the AMQP request into the MessageHeaders
and from the Spring Integration MessageHeaders to the AMQP Headers.
This is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.mapping.HeaderMapper" />
<tool:expected-type type="org.springframework.integration.amqp.support.AmqpHeaderMapper" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
Expand Down Expand Up @@ -478,9 +483,14 @@ property set to TRUE.
</xsd:attribute>
<xsd:attribute name="header-mapper" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The reference to the 'AmqpHeaderMapper' implementation bean to map AMQP Headers from the AMQP request into the MessageHeaders
and from the Spring Integration MessageHeaders to the AMQP Headers.
This is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.mapping.HeaderMapper" />
<tool:expected-type type="org.springframework.integration.amqp.support.AmqpHeaderMapper" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
Expand Down
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="amqpHeaderMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

<amqp:inbound-channel-adapter id="rabbitInbound" queue-names="test.queue"
mapped-request-headers="*"
header-mapper="amqpHeaderMapper"/>

</beans>
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,8 +22,11 @@
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.amqp.AmqpHeaders;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.channel.DirectChannel;
Expand All @@ -35,9 +38,11 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

/**
* @author Mark Fisher
* @author Artem Bilan
* @since 2.1
*/
@ContextConfiguration
Expand All @@ -63,12 +68,12 @@ public void verifyLifeCycle() {
assertEquals(Boolean.FALSE, TestUtils.getPropertyValue(adapter, "autoStartup"));
assertEquals(123, TestUtils.getPropertyValue(adapter, "phase"));
}

@Test
public void withHeaderMapperStandardAndCustomHeaders() {
AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperStandardAndCustomHeaders", AmqpInboundChannelAdapter.class);
AbstractMessageListenerContainer mlc =

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
MessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener", MessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
Expand All @@ -90,12 +95,12 @@ public void withHeaderMapperStandardAndCustomHeaders() {
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}

@Test
public void withHeaderMapperOnlyCustomHeaders() {
AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperOnlyCustomHeaders", AmqpInboundChannelAdapter.class);
AbstractMessageListenerContainer mlc =

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
MessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener", MessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
Expand All @@ -117,12 +122,12 @@ public void withHeaderMapperOnlyCustomHeaders() {
assertNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
assertNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}

@Test
public void withHeaderMapperNothingToMap() {
AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperNothingToMap", AmqpInboundChannelAdapter.class);
AbstractMessageListenerContainer mlc =

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
MessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener", MessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
Expand All @@ -135,7 +140,7 @@ public void withHeaderMapperNothingToMap() {
amqpProperties.setHeader("bar", "bar");
Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
listener.onMessage(amqpMessage);

QueueChannel requestChannel = context.getBean("requestChannel", QueueChannel.class);
org.springframework.integration.Message<?> siMessage = requestChannel.receive(0);
assertNull(siMessage.getHeaders().get("foo"));
Expand All @@ -145,12 +150,12 @@ public void withHeaderMapperNothingToMap() {
assertNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
assertNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}

@Test
public void withHeaderMapperDefaultMapping() {
AmqpInboundChannelAdapter adapter = context.getBean("withHeaderMapperDefaultMapping", AmqpInboundChannelAdapter.class);
AbstractMessageListenerContainer mlc =

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(adapter, "messageListenerContainer", AbstractMessageListenerContainer.class);
MessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener", MessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
Expand All @@ -172,4 +177,16 @@ public void withHeaderMapperDefaultMapping() {
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.APP_ID));
assertNotNull(siMessage.getHeaders().get(AmqpHeaders.CONTENT_TYPE));
}

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpInboundChannelAdapterParserTests-headerMapper-fail-context.xml", this.getClass());
}
catch (BeanDefinitionParsingException e) {
assertTrue(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'"));
}
}

}
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="amqpHeaderMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

<amqp:inbound-gateway request-channel="requests" queue-names="test"
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="bar*"
header-mapper="amqpHeaderMapper"/>

</beans>
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,9 @@
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessagingException;
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
Expand All @@ -47,9 +49,11 @@
import static junit.framework.Assert.assertEquals;

import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

/**
* @author Mark Fisher
* @author Artem Bilan
* @since 2.1
*/
@ContextConfiguration
Expand Down Expand Up @@ -77,27 +81,27 @@ public void verifyLifeCycle() {
assertEquals(Boolean.FALSE, TestUtils.getPropertyValue(gateway, "autoStartup"));
assertEquals(123, TestUtils.getPropertyValue(gateway, "phase"));
}

@SuppressWarnings("rawtypes")
@Test
public void verifyUsageWithHeaderMapper() throws Exception{
DirectChannel requestChannel = context.getBean("requestChannel", DirectChannel.class);
requestChannel.subscribe(new MessageHandler() {
requestChannel.subscribe(new MessageHandler() {
public void handleMessage(org.springframework.integration.Message<?> siMessage)
throws MessagingException {
org.springframework.integration.Message<?> replyMessage = MessageBuilder.fromMessage(siMessage).setHeader("bar", "bar").build();
MessageChannel replyChannel = (MessageChannel) siMessage.getHeaders().getReplyChannel();
replyChannel.send(replyMessage);
}
});

final AmqpInboundGateway gateway = context.getBean("withHeaderMapper", AmqpInboundGateway.class);

Field amqpTemplateField = ReflectionUtils.findField(AmqpInboundGateway.class, "amqpTemplate");
amqpTemplateField.setAccessible(true);
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(gateway, "amqpTemplate", RabbitTemplate.class);
amqpTemplate = Mockito.spy(amqpTemplate);

Mockito.doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
Expand All @@ -109,8 +113,8 @@ public Object answer(InvocationOnMock invocation) {
.when(amqpTemplate).send(Mockito.any(String.class), Mockito.any(String.class),
Mockito.any(Message.class), Mockito.any(CorrelationData.class));
ReflectionUtils.setField(amqpTemplateField, gateway, amqpTemplate);
AbstractMessageListenerContainer mlc =

AbstractMessageListenerContainer mlc =
TestUtils.getPropertyValue(gateway, "messageListenerContainer", AbstractMessageListenerContainer.class);
MessageListener listener = TestUtils.getPropertyValue(mlc, "messageListener", MessageListener.class);
MessageProperties amqpProperties = new MessageProperties();
Expand All @@ -124,11 +128,22 @@ public Object answer(InvocationOnMock invocation) {
amqpProperties.setHeader("bar", "bar");
Message amqpMessage = new Message("hello".getBytes(), amqpProperties);
listener.onMessage(amqpMessage);

Mockito.verify(amqpTemplate, Mockito.times(1)).send(Mockito.any(String.class), Mockito.any(String.class),
Mockito.any(Message.class), Mockito.any(CorrelationData.class));
}

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml", this.getClass());
}
catch (BeanDefinitionParsingException e) {
assertTrue(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'"));
}
}

private static class TestConverter extends SimpleMessageConverter {}

}
Expand Up @@ -25,6 +25,14 @@
exchange-name="outboundchanneladapter.test.1"
mapped-request-headers="foo*"/>

<bean id="customHeaderMapper" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.integration.amqp.support.AmqpHeaderMapper"/>
</bean>

<amqp:outbound-channel-adapter id="withCustomHeaderMapper"
exchange-name="test.exchange"
header-mapper="customHeaderMapper"/>

<int:channel id="requestChannel"/>

<int:chain id="chainWithRabbitOutbound" input-channel="amqpOutboundChannelAdapterWithinChain">
Expand Down
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="amqpHeaderMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

<amqp:outbound-channel-adapter id="rabbitOutbound" exchange-name="test.queue"
mapped-request-headers="foo*"
header-mapper="amqpHeaderMapper"/>

</beans>
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,13 +45,15 @@
import org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.amqp.AmqpHeaders;
import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.context.NamedComponent;
Expand Down Expand Up @@ -85,6 +87,10 @@ public class AmqpOutboundChannelAdapterParserTests {
@Autowired
private ApplicationContext context;

@Autowired
@Qualifier("withCustomHeaderMapper.handler")
private MessageHandler amqpMessageHandlerWithCustomHeaderMapper;

@Test
public void verifyIdAsChannel() {
Object channel = context.getBean("rabbitOutbound");
Expand Down Expand Up @@ -288,6 +294,24 @@ public void testInt2773WithOverrideToDefaultAmqpTemplateExchangeAndRoutingLey()
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
}

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpOutboundChannelAdapterParserTests-headerMapper-fail-context.xml", this.getClass());
}
catch (BeanDefinitionParsingException e) {
assertTrue(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'"));
}
}

@Test
public void testInt2971AmqpOutboundChannelAdapterWithCustomHeaderMapper() {
AmqpHeaderMapper headerMapper = TestUtils.getPropertyValue(this.amqpMessageHandlerWithCustomHeaderMapper, "headerMapper", AmqpHeaderMapper.class);
assertSame(this.context.getBean("customHeaderMapper"), headerMapper);
}


public static class FooAdvice extends AbstractRequestHandlerAdvice {

@Override
Expand Down
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="amqpHeaderMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

<amqp:outbound-gateway request-channel="toRabbit0"
mapped-request-headers="foo*, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="bar*"
header-mapper="amqpHeaderMapper"/>


</beans>

0 comments on commit d306c44

Please sign in to comment.