Skip to content

Commit

Permalink
HORNETQ-891
Browse files Browse the repository at this point in the history
    The fix enables a user to configure a stomp acceptor with a customized connection-ttl.
  • Loading branch information
gaohoward committed Apr 6, 2012
1 parent 8d1aa69 commit e494292
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 14 deletions.
25 changes: 20 additions & 5 deletions docs/user-manual/en/interoperability.xml
Expand Up @@ -79,11 +79,26 @@
the client is still alive or not. STOMP connections therefore default to a connection-ttl
value of 1 minute (see chapter on <link linkend="connection-ttl">connection-ttl</link> for
more information. This value can be overridden using connection-ttl-override. </para>
<note>
<para>Please note that the STOMP protocol does not contain any heartbeat frame. It is
therefore the user's responsibility to make sure data is sent within connection-ttl or the
server will assume the client is dead and clean up server side resources.</para>
</note>

<para>If you need a specific connection-ttl for your stomp connections without affecting the connection-ttl-override setting, you
can configure your stomp acceptor with the "connection-ttl" property, which is used to set the ttl for connections that are
created from that acceptor. For example:
</para>
<programlisting>
&lt;acceptor name="stomp-acceptor">
&lt;factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory&lt;/factory-class>
&lt;param key="protocol" value="stomp"/>
&lt;param key="port" value="61613"/>
&lt;param key="connection-ttl" value="20000"/>
&lt;/acceptor>
</programlisting>
<para>The above configuration will make sure that any stomp connection that is created from that acceptor will have its
connection-ttl set to 20 seconds.</para>

<note><para>Please note that the STOMP protocol version 1.0 does not contain any heartbeat frame. It is therefore the user's
responsibility to make sure data is sent within connection-ttl or the server will assume the client is dead and clean up server
side resources. With <literal>Stomp 1.1</literal> users can use heart-beats to maintain the life cycle of stomp
connections.</para></note>
</section>
<section>
<title>Stomp and JMS interoperabilty</title>
Expand Down
Expand Up @@ -125,7 +125,18 @@ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final
// Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
// will be timed out and closed!

long ttl = server.getConfiguration().getConnectionTTLOverride();
Long ttl = (Long)acceptorUsed.getConfiguration().get("connection-ttl");

if (ttl != null)
{
if (ttl > 0)
{
return new ConnectionEntry(conn, null, System.currentTimeMillis(), ttl);
}
return new ConnectionEntry(conn, null, System.currentTimeMillis(), 1 * 60 * 1000);
}

ttl = server.getConfiguration().getConnectionTTLOverride();

if (ttl != -1)
{
Expand Down
Expand Up @@ -135,6 +135,8 @@ public class TransportConstants

public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS;

public static final String CONNECTION_TTL = "connection-ttl";

static
{
Set<String> allowableAcceptorKeys = new HashSet<String>();
Expand All @@ -159,7 +161,8 @@ public class TransportConstants
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);

allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);

ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);

Set<String> allowableConnectorKeys = new HashSet<String>();
Expand Down
111 changes: 111 additions & 0 deletions tests/src/org/hornetq/tests/integration/stomp/ExtraStompTest.java
@@ -0,0 +1,111 @@
package org.hornetq.tests.integration.stomp;

import java.net.SocketException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.MessageConsumer;
import javax.jms.TextMessage;

import junit.framework.Assert;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.unit.util.InVMContext;

public class ExtraStompTest extends StompTestBase
{
public ExtraStompTest()
{
autoCreateServer = false;
}

public void testConnectionTTL() throws Exception
{
try
{
server = createServerWithTTL(3000);
server.start();

setUpAfterServer();

String connect_frame = "CONNECT\n" + "login: brianm\n"
+ "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
sendFrame(connect_frame);

String f = receiveFrame(10000);
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);

String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL;
sendFrame(frame);

//sleep to let the connection die
Thread.sleep(8000);

frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 2" + Stomp.NULL;

try
{
sendFrame(frame);
fail("Message has been sent successfully after ttl expires! ttl configuration not working!");
}
catch (SocketException e)
{
//expected.
}

MessageConsumer consumer = session.createConsumer(queue);

TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);

message = (TextMessage)consumer.receive(2000);
Assert.assertNull(message);
}
finally
{
cleanUp();
server.stop();
}
}

protected JMSServerManager createServerWithTTL(long ttl) throws Exception
{
Configuration config = createBasicConfig();
config.setSecurityEnabled(false);
config.setPersistenceEnabled(false);

Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.CONNECTION_TTL, ttl);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);

JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations()
.add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setContext(new InVMContext());
return server;
}

}
42 changes: 35 additions & 7 deletions tests/src/org/hornetq/tests/integration/stomp/StompTestBase.java
Expand Up @@ -79,8 +79,8 @@ public abstract class StompTestBase extends UnitTestCase
protected Topic topic;

protected JMSServerManager server;


protected boolean autoCreateServer = true;

// Implementation methods
// -------------------------------------------------------------------------
Expand All @@ -90,8 +90,25 @@ protected void setUp() throws Exception

forceGC();

server = createServer();
server.start();
if (autoCreateServer)
{
server = createServer();
server.start();
connectionFactory = createConnectionFactory();

stompSocket = createSocket();
inputBuffer = new ByteArrayOutputStream();

connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName());
topic = session.createTopic(getTopicName());
connection.start();
}
}

protected void setUpAfterServer() throws Exception
{
connectionFactory = createConnectionFactory();

stompSocket = createSocket();
Expand Down Expand Up @@ -133,15 +150,26 @@ protected JMSServerManager createServer() throws Exception
}

protected void tearDown() throws Exception
{
if (autoCreateServer)
{
connection.close();
if (stompSocket != null)
{
stompSocket.close();
}
server.stop();
}
super.tearDown();
}

protected void cleanUp() throws Exception
{
connection.close();
if (stompSocket != null)
{
stompSocket.close();
}
server.stop();

super.tearDown();
}

protected void reconnect() throws Exception
Expand Down

0 comments on commit e494292

Please sign in to comment.