Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
adapterBuilder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(adapterBuilder, element, "error-channel", "errorChannel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "event-types");
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup");
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "phase");
return adapterBuilder.getBeanDefinition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,19 @@ public TcpConnectionEvent(TcpConnectionSupport connection, Throwable t,
}

public EventType getType() {
return type;
return this.type;
}

public String getConnectionId() {
return ((TcpConnection) this.getSource()).getConnectionId();
}

public String getConnectionFactoryName() {
return connectionFactoryName;
return this.connectionFactoryName;
}

public Throwable getThrowable() {
return this.throwable;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ public void setEventTypes(Class<? extends TcpConnectionEvent>[] eventTypes) {
}

public void onApplicationEvent(TcpConnectionEvent event) {
if (CollectionUtils.isEmpty(this.eventTypes)) {
this.sendMessage(messageFromEvent(event));
}
else {
for (Class<? extends TcpConnectionEvent> eventType : this.eventTypes) {
if (eventType.isAssignableFrom(event.getClass())) {
this.sendMessage(messageFromEvent(event));
break;
if (this.isRunning()) {
if (CollectionUtils.isEmpty(this.eventTypes)) {
this.sendMessage(messageFromEvent(event));
}
else {
for (Class<? extends TcpConnectionEvent> eventType : this.eventTypes) {
if (eventType.isAssignableFrom(event.getClass())) {
this.sendMessage(messageFromEvent(event));
break;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,48 +620,51 @@ setCustomHeaders(). Default is TcpMessageMapper.
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:attribute name="id" type="xsd:string" use="optional" />
<xsd:attribute name="event-types" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Comma delimited list of event types (classes that extend TcpConnectionEvent) that this adapter
should send to the message channel. By default, all event types will be sent [OPTIONAL].
Note, it is NOT possible to filter by subtype, just class - for example, the standard TcpConnectionEvent
class has 3 subtypes (OPEN, CLOSE, EXCEPTION). This feature is intended to allow the adapter to
be used, say, to obtain just subclasses of TcpConnectionEvent (perhaps generated by a
TcpConnectionInterceptor, perhaps to signal handshaking of some kind).
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The channel to which Messages generated from Application Context events will be sent.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="error-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
If a (synchronous) downstream exception is thrown and an error-channel is specified,
a MessagingException will be sent to this channel. Otherwise, any such exception
will be propagated to the caller.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="auto-startup" type="xsd:string" default="true" />
<xsd:complexContent>
<xsd:extension base="smartLifeCycleType">
<xsd:attribute name="id" type="xsd:string" use="optional" />
<xsd:attribute name="event-types" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Comma delimited list of event types (classes that extend TcpConnectionEvent) that this adapter
should send to the message channel. By default, all event types will be sent [OPTIONAL].
Note, it is NOT possible to filter by subtype, just class - for example, the standard TcpConnectionEvent
class has 3 subtypes (OPEN, CLOSE, EXCEPTION). This feature is intended to allow the adapter to
be used, say, to obtain just subclasses of TcpConnectionEvent (perhaps generated by a
TcpConnectionInterceptor, perhaps to signal handshaking of some kind).
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
The channel to which Messages generated from Application Context events will be sent.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="error-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
<xsd:documentation>
If a (synchronous) downstream exception is thrown and an error-channel is specified,
a MessagingException will be sent to this channel. Otherwise, any such exception
will be propagated to the caller.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,13 @@
</constructor-arg>
</bean>

<ip:tcp-connection-event-inbound-channel-adapter id="eventAdapter" channel="nullChannel"
<ip:tcp-connection-event-inbound-channel-adapter id="eventAdapter" channel="eventChannel"
auto-startup="false" phase="23" error-channel="eventErrors"
event-types="org.springframework.integration.ip.config.ParserUnitTests$EventSubclass1, org.springframework.integration.ip.config.ParserUnitTests$EventSubclass2"/>

<int:channel id="eventChannel">
<int:queue />
</int:channel>

<int:channel id="eventErrors" />
</beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

import java.util.Iterator;
import java.util.Set;
Expand All @@ -41,6 +42,7 @@
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.EventDrivenConsumer;
Expand All @@ -54,6 +56,7 @@
import org.springframework.integration.ip.tcp.connection.DefaultTcpNioSSLConnectionSupport;
import org.springframework.integration.ip.tcp.connection.DefaultTcpSSLContextSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionEvent.TcpConnectionEventType;
import org.springframework.integration.ip.tcp.connection.TcpConnectionEventListeningMessageProducer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpMessageMapper;
Expand Down Expand Up @@ -262,6 +265,9 @@ public class ParserUnitTests {
@Autowired
TcpConnectionEventListeningMessageProducer eventAdapter;

@Autowired
QueueChannel eventChannel;

private static volatile int adviceCalled;

@Test
Expand Down Expand Up @@ -649,12 +655,28 @@ public void testSecureServer() {
assertSame(socketSupport, dfa.getPropertyValue("tcpSocketSupport"));
}

@SuppressWarnings("unchecked")
@Test
public void testEventAdapter() {
Set<?> eventTypes = TestUtils.getPropertyValue(this.eventAdapter, "eventTypes", Set.class);
assertEquals(2, eventTypes.size());
assertTrue(eventTypes.contains(EventSubclass1.class));
assertTrue(eventTypes.contains(EventSubclass2.class));
assertFalse(TestUtils.getPropertyValue(this.eventAdapter, "autoStartup", Boolean.class));
assertEquals(23, TestUtils.getPropertyValue(this.eventAdapter, "phase"));
assertEquals("eventErrors", TestUtils.getPropertyValue(this.eventAdapter, "errorChannel",
DirectChannel.class).getComponentName());

TcpConnectionSupport connection = mock(TcpConnectionSupport.class);
TcpConnectionEvent event = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
this.eventAdapter.setEventTypes(new Class[] {TcpConnectionEvent.class});
this.eventAdapter.onApplicationEvent(event);
assertNull(this.eventChannel.receive(0));
this.eventAdapter.start();
this.eventAdapter.onApplicationEvent(event);
Message<?> eventMessage = this.eventChannel.receive(0);
assertNotNull(eventMessage);
assertSame(event, eventMessage.getPayload());
}

public static class FooAdvice extends AbstractRequestHandlerAdvice {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand All @@ -42,18 +43,19 @@ public class ConnectionEventTests {
@Test
public void test() throws Exception {
Socket socket = mock(Socket.class);
final AtomicReference<ApplicationEvent> theEvent = new AtomicReference<ApplicationEvent>();
final AtomicReference<TcpConnectionEvent> theEvent = new AtomicReference<TcpConnectionEvent>();
TcpNetConnection conn = new TcpNetConnection(socket, false, false, new ApplicationEventPublisher() {

public void publishEvent(ApplicationEvent event) {
theEvent.set(event);
theEvent.set((TcpConnectionEvent) event);
}
}, "foo");
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=OPEN, factory=foo, connectionId=" + conn.getConnectionId() + "]", theEvent.get().toString());
@SuppressWarnings("unchecked")
Serializer<Object> serializer = mock(Serializer.class);
doThrow(new RuntimeException("foo")).when(serializer).serialize(Mockito.any(Object.class), Mockito.any(OutputStream.class));
RuntimeException toBeThrown = new RuntimeException("foo");
doThrow(toBeThrown).when(serializer).serialize(Mockito.any(Object.class), Mockito.any(OutputStream.class));
conn.setMapper(new TcpMessageMapper());
conn.setSerializer(serializer);
try {
Expand All @@ -64,6 +66,8 @@ public void publishEvent(ApplicationEvent event) {
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=EXCEPTION, factory=foo, connectionId=" + conn.getConnectionId() +
", Exception=java.lang.RuntimeException: foo]", theEvent.get().toString());
assertNotNull(theEvent.get().getThrowable());
assertSame(toBeThrown, theEvent.get().getThrowable());
conn.close();
assertNotNull(theEvent.get());
assertEquals("TcpConnectionEvent [type=CLOSE, factory=foo, connectionId=" + conn.getConnectionId() + "]", theEvent.get().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void testNoFilter() {
QueueChannel outputChannel = new QueueChannel();
eventProducer.setOutputChannel(outputChannel);
eventProducer.afterPropertiesSet();
eventProducer.start();
TcpConnectionSupport connection = Mockito.mock(TcpConnectionSupport.class);
TcpConnectionEvent event1 = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
eventProducer.onApplicationEvent(event1);
Expand Down Expand Up @@ -66,6 +67,7 @@ public void testFilter() {
eventProducer.setOutputChannel(outputChannel);
eventProducer.setEventTypes(new Class[] {FooEvent.class, BarEvent.class});
eventProducer.afterPropertiesSet();
eventProducer.start();
TcpConnectionSupport connection = Mockito.mock(TcpConnectionSupport.class);
TcpConnectionEvent event1 = new TcpConnectionEvent(connection, TcpConnectionEventType.OPEN, "foo");
eventProducer.onApplicationEvent(event1);
Expand Down
28 changes: 28 additions & 0 deletions src/reference/docbook/ip.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,34 @@
Configuring a connection interceptor factory chain.
</para>
</section>
<section id="tcp-events">
<title>TCP Connection Events</title>
<para>
Beginning with version 3.0, changes to <interfacename>TcpConnection</interfacename>s
are reported by <classname>TcpConnectionEvent</classname>s. <classname>TcpConnectionEvent</classname>
is a subclass of <classname>ApplicationEvent</classname> and thus can
be received by any <interfacename>ApplicationListener</interfacename> defined in
the <interfacename>ApplicationContext</interfacename>.
</para>
<para>
For convenience, a <code>&lt;int-ip:tcp-connection-event-inbound-channel-adapter/&gt;</code>
is provided. This adapter will receive all <classname>TcpConnectionEvent</classname>s (by
default), and send them to its <code>channel</code>. The adapter accepts an <code>event-type</code>
attribute, which is a list of class names for events that should be sent. This can be used
if an application subclasses <classname>TcpConnectionEvent</classname> for some reason, and wishes
to only receive those events. Omitting this attribute will mean that all
<classname>TcpConnectionEvent</classname>s will be sent.
</para>
<para>
<classname>TcpConnectionEvents</classname> contain:
<itemizedlist>
<listitem>Event type (OPEN, CLOSE, EXCEPTION)</listitem>.
<listitem>Connection Id (which can be used in a message header to send data to the connection)</listitem>
<listitem>Connection Factory Name (the bean name of the connection factory the connection belongs to)</listitem>
<listitem>The <classname>Throwable</classname> (for EXCEPTION event types only)</listitem>
</itemizedlist>
</para>
</section>
<section id="tcp-adapters">
<title>TCP Adapters</title>
<para>
Expand Down
8 changes: 3 additions & 5 deletions src/reference/docbook/whats-new.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
been renamed to <classname>TcpConnectionInterceptorSupport</classname>.
</para>
<para>
In addition, a new <code>&lt;int-ip:tcp-connection-event-channel-adapter/&gt;</code>
is provided; by default, this adapter sends all <classname>TcpConnectionEvents</classname>
In addition, a new <code>&lt;int-ip:tcp-connection-event-inbound-channel-adapter/&gt;</code>
is provided; by default, this adapter sends all <classname>TcpConnectionEvent</classname>s
to a <interfacename>Channel</interfacename>.
</para>
<para>
Expand All @@ -47,9 +47,7 @@
to explicitly close a connection using its ID.
</para>
<para>
Further documentation for these features will follow; for now, refer to the
schema documentation for information about the new adapter, and JavaDocs for
it as well as the other features.
For more information see <xref linkend="tcp-events"/>.
</para>
</section>
</section>
Expand Down