Skip to content

Commit

Permalink
some packets now processed async + queue has not executor to minimise…
Browse files Browse the repository at this point in the history
… remoting threads being held for too long, also some other tweaks
  • Loading branch information
purplefox committed Jan 11, 2010
1 parent e7278a2 commit 2bc6de8
Show file tree
Hide file tree
Showing 31 changed files with 453 additions and 340 deletions.
4 changes: 2 additions & 2 deletions docs/user-manual/en/configuration-index.xml
Expand Up @@ -14,7 +14,7 @@
<!-- -->
<!-- Red Hat, as the licensor of this document, waives the right to enforce, -->
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent -->
<!-- permitted by applicable law. -->
<!-- permitted by applicable law. a -->
<!-- ============================================================================= -->
<chapter id="configuration-index">
<title>Configuration Reference</title>
Expand Down Expand Up @@ -352,7 +352,7 @@
<entry>Should incoming packets on the server be handed off to a thread
from the thread pool for processing or should they be handled on the
remoting thread?</entry>
<entry>false</entry>
<entry>true</entry>
</row>
<row>
<entry><link linkend="transaction-config"
Expand Down
8 changes: 5 additions & 3 deletions docs/user-manual/en/connection-ttl.xml
Expand Up @@ -160,10 +160,12 @@ at org.acme.yourproject.YourClass (YourClass.java:666)
<title>Configuring Asynchronous Connection Execution</title>
<para>By default, packets received on the server side are executed on the remoting
thread.</para>
<para>It is possible instead to use a thread from a thread pool to handle the packents so
<para>It is possible instead to use a thread from a thread pool to handle some packets so
that the remoting thread is not tied up for too long. However, please note that
processing operations asynchronously on another thread adds a little more latency. To
enable asynchronous connection execution, set the parameter <literal
processing operations asynchronously on another thread adds a little more latency.
Please note that most short running operations are always handled on the remoting thread for performance reasons.

To enable asynchronous connection execution, set the parameter <literal
>async-connection-execution-enabled</literal> in <literal
>hornetq-configuration.xml</literal> to <literal>true</literal> (default value is
<literal>false</literal>).</para>
Expand Down
Expand Up @@ -72,7 +72,7 @@ public class ConfigurationImpl implements Configuration

public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;

public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = false;
public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;

public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";

Expand Down
9 changes: 2 additions & 7 deletions src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Expand Up @@ -111,8 +111,6 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding

private final Object notificationLock = new Object();

private final org.hornetq.utils.ExecutorFactory redistributorExecutorFactory;

private final HierarchicalRepository<AddressSettings> addressSettingsRepository;

private final HornetQServer server;
Expand All @@ -127,7 +125,6 @@ public PostOfficeImpl(final HornetQServer server,
final boolean enableWildCardRouting,
final int idCacheSize,
final boolean persistIDCache,
final ExecutorFactory orderedExecutorFactory,
final HierarchicalRepository<AddressSettings> addressSettingsRepository)

{
Expand Down Expand Up @@ -156,8 +153,6 @@ public PostOfficeImpl(final HornetQServer server,

this.persistIDCache = persistIDCache;

redistributorExecutorFactory = orderedExecutorFactory;

this.addressSettingsRepository = addressSettingsRepository;

this.server = server;
Expand Down Expand Up @@ -350,7 +345,7 @@ public void onNotification(final Notification notification)

if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
queue.addRedistributor(redistributionDelay);
}
}
}
Expand Down Expand Up @@ -420,7 +415,7 @@ public void onNotification(final Notification notification)

if (redistributionDelay != -1)
{
queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
queue.addRedistributor(redistributionDelay);
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/org/hornetq/core/remoting/Packet.java
Expand Up @@ -80,4 +80,6 @@ public interface Packet
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();

boolean isAsyncExec();
}
3 changes: 2 additions & 1 deletion src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
Expand Up @@ -110,6 +110,7 @@
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
Expand Down Expand Up @@ -221,7 +222,7 @@ public Packet decode(final HornetQBuffer in)
}
case SESS_COMMIT:
{
packet = new PacketImpl(PacketImpl.SESS_COMMIT);
packet = new SessionCommitMessage();
break;
}
case SESS_ROLLBACK:
Expand Down
32 changes: 22 additions & 10 deletions src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
Expand Up @@ -334,12 +334,12 @@ public void removeAllChannels()
channels.clear();
}
}

public void flushConfirmations()
{
synchronized (transferLock)
{
for (Channel channel: channels.values())
for (Channel channel : channels.values())
{
channel.flushConfirmations();
}
Expand All @@ -349,18 +349,16 @@ public void flushConfirmations()
// Buffer Handler implementation
// ----------------------------------------------------

private volatile boolean executing;

public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);

if (executor == null || packet.getType() == PacketImpl.PING)
{
// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
else
if (packet.isAsyncExec() && executor != null)
{
executing = true;

executor.execute(new Runnable()
{
public void run()
Expand All @@ -373,10 +371,24 @@ public void run()
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}

executing = false;
}
});
}

else
{
//To prevent out of order execution if interleaving sync and async operations on same connection
while (executing)
{
Thread.yield();
}

// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}

dataReceived = true;
}

Expand Down
Expand Up @@ -262,6 +262,11 @@ public boolean isRequiresConfirmations()
{
return true;
}

public boolean isAsyncExec()
{
return false;
}

@Override
public String toString()
Expand Down
Expand Up @@ -72,6 +72,11 @@ public void decodeRest(final HornetQBuffer buffer)
{
considerLastMessageAsDelivered = buffer.readBoolean();
}

public boolean isAsyncExec()
{
return true;
}

// Static --------------------------------------------------------

Expand Down
Expand Up @@ -44,6 +44,12 @@ public boolean equals(final Object other)

return super.equals(other);
}

@Override
public boolean isAsyncExec()
{
return true;
}

// Package protected ---------------------------------------------

Expand Down
Expand Up @@ -61,6 +61,12 @@ public boolean isOnePhase()
return onePhase;
}

@Override
public boolean isAsyncExec()
{
return true;
}

@Override
public void encodeRest(final HornetQBuffer buffer)
{
Expand Down
Expand Up @@ -64,6 +64,11 @@ public void decodeRest(final HornetQBuffer buffer)
{
xid = XidCodecSupport.decodeXid(buffer);
}

public boolean isAsyncExec()
{
return true;
}

@Override
public boolean equals(final Object other)
Expand Down
Expand Up @@ -52,6 +52,7 @@ public Xid getXid()
{
return xid;
}


@Override
public void encodeRest(final HornetQBuffer buffer)
Expand All @@ -64,6 +65,11 @@ public void decodeRest(final HornetQBuffer buffer)
{
xid = XidCodecSupport.decodeXid(buffer);
}

public boolean isAsyncExec()
{
return true;
}

@Override
public boolean equals(final Object other)
Expand Down
6 changes: 4 additions & 2 deletions src/main/org/hornetq/core/server/Queue.java
Expand Up @@ -63,7 +63,7 @@ public interface Queue extends Bindable

void cancel(MessageReference reference) throws Exception;

void deliverAsync(Executor executor);
void deliverAsync();

int getMessageCount();

Expand Down Expand Up @@ -116,7 +116,7 @@ public interface Queue extends Bindable

int moveReferences(Filter filter, SimpleString toAddress) throws Exception;

void addRedistributor(long delay, Executor executor);
void addRedistributor(long delay);

void cancelRedistributor() throws Exception;

Expand Down Expand Up @@ -152,6 +152,8 @@ public interface Queue extends Bindable
* @return true if paused, false otherwise.
*/
boolean isPaused();

Executor getExecutor();


}
2 changes: 0 additions & 2 deletions src/main/org/hornetq/core/server/ServerSession.java
Expand Up @@ -68,8 +68,6 @@ public interface ServerSession

void close() throws Exception;

void promptDelivery(Queue queue);

void handleAcknowledge(final SessionAcknowledgeMessage packet);

void handleExpired(final SessionExpiredMessage packet);
Expand Down
6 changes: 3 additions & 3 deletions src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Expand Up @@ -540,7 +540,7 @@ private void fail(final boolean beforeReconnect)

if (queue != null)
{
queue.deliverAsync(executor);
queue.deliverAsync();
}
}
}
Expand Down Expand Up @@ -683,7 +683,7 @@ private synchronized boolean createObjects()

queue.addConsumer(BridgeImpl.this);

queue.deliverAsync(executor);
queue.deliverAsync();

BridgeImpl.log.info("Bridge " + name + " is connected to its destination");

Expand Down Expand Up @@ -762,7 +762,7 @@ public void run()

if (queue != null)
{
queue.deliverAsync(executor);
queue.deliverAsync();
}
}
catch (Exception e)
Expand Down
Expand Up @@ -189,7 +189,7 @@ public void run()
{
active = true;

queue.deliverAsync(executor);
queue.deliverAsync();
}
}
}
Expand Down
13 changes: 4 additions & 9 deletions src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Expand Up @@ -633,8 +633,6 @@ public CreateSessionResponseMessage createSession(final String name,

Channel channel = connection.getChannel(channelID, sendWindowSize);

Executor sessionExecutor = executorFactory.getExecutor();

final ServerSessionImpl session = new ServerSessionImpl(name,
username,
password,
Expand All @@ -649,18 +647,15 @@ public CreateSessionResponseMessage createSession(final String name,
postOffice,
resourceManager,
securityStore,
sessionExecutor,
channel,
managementService,
this,
configuration.getManagementAddress());

sessions.put(name, session);

// The executor on the OperationContext here has to be the same as the session, or we would have ordering issues
// on messages
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
storageManager.newContext(sessionExecutor),
storageManager.newContext(executorFactory.getExecutor()),
storageManager);

session.setHandler(handler);
Expand Down Expand Up @@ -1030,8 +1025,9 @@ private void initialisePart2() throws Exception

if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser()) && ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
log.warn("It has been detected that the cluster admin user and password which are used to " + "replicate management operation from one node to the other have not been changed from the installation default. "
+ "Please see the HornetQ user guide for instructions on how to do this.");
log.warn("Security risk! It has been detected that the cluster admin user and password "
+ "have not been changed from the installation default. "
+ "Please see the HornetQ user guide, cluster chapter, for instructions on how to do this.");
}

securityStore = new SecurityStoreImpl(securityRepository,
Expand Down Expand Up @@ -1059,7 +1055,6 @@ private void initialisePart2() throws Exception
configuration.isWildcardRoutingEnabled(),
configuration.getIDCacheSize(),
configuration.isPersistIDCache(),
executorFactory,
addressSettingsRepository);

messagingServerControl = managementService.registerServer(postOffice,
Expand Down

0 comments on commit 2bc6de8

Please sign in to comment.