Skip to content

Commit

Permalink
HORNETQ-569 - Allow interceptors to intercept outgoing packets
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Oct 17, 2012
1 parent c99dea8 commit 2b1a59a
Show file tree
Hide file tree
Showing 26 changed files with 807 additions and 98 deletions.
59 changes: 41 additions & 18 deletions docs/user-manual/en/intercepting-operations.xml
Expand Up @@ -26,12 +26,14 @@
<chapter id="intercepting-operations">
<title>Intercepting Operations</title>
<para>HornetQ supports <emphasis>interceptors</emphasis> to intercept packets entering
the server. Any supplied interceptors would be called for any packet entering
the server, this allows custom code to be executed, e.g. for auditing packets,
filtering or other reasons. Interceptors can change the packets they intercept.</para>
and exiting the server. Incoming and outgoing interceptors are be called for any packet
entering or exiting the server respectively. This allows custom code to be executed,
e.g. for auditing packets, filtering or other reasons. Interceptors can change the
packets they intercept. This makes interceptors powerful, but also potentially
dangerous.</para>
<section>
<title>Implementing The Interceptors</title>
<para>A interceptor must implement the <literal>Interceptor interface</literal>:</para>
<para>An interceptor must implement the <literal>Interceptor interface</literal>:</para>
<programlisting>
package org.hornetq.api.core.interceptor;

Expand All @@ -47,31 +49,52 @@ public interface Interceptor
<para>if <literal>true</literal> is returned, the process continues normally</para>
</listitem>
<listitem>
<para>if <literal>false</literal> is returned, the process is aborted, no other
interceptors will be called and the packet will not be handled by the server at
all.</para>
<para>if <literal>false</literal> is returned, the process is aborted, no other interceptors
will be called and the packet will not be processed further by the server.</para>
</listitem>
</itemizedlist>
</section>
<section>
<title>Configuring The Interceptors</title>
<para>The interceptors are configured in <literal>hornetq-configuration.xml</literal>:</para>
<para>Both incoming and outgoing interceptors are configured in
<literal>hornetq-configuration.xml</literal>:</para>
<programlisting>
&lt;remoting-interceptors&gt;
&lt;class-name&gt;org.hornetq.jms.example.LoginInterceptor&lt;/class-name&gt;
&lt;class-name&gt;org.hornetq.jms.example.AdditionalPropertyInterceptor&lt;/class-name&gt;
&lt;/remoting-interceptors&gt;
</programlisting>
&lt;remoting-incoming-interceptors&gt;
&lt;class-name&gt;org.hornetq.jms.example.LoginInterceptor&lt;/class-name&gt;
&lt;class-name&gt;org.hornetq.jms.example.AdditionalPropertyInterceptor&lt;/class-name&gt;
&lt;/remoting-incoming-interceptors&gt;
</programlisting>
<programlisting>
&lt;remoting-outgoing-interceptors&gt;
&lt;class-name&gt;org.hornetq.jms.example.LogoutInterceptor&lt;/class-name&gt;
&lt;class-name&gt;org.hornetq.jms.example.AdditionalPropertyInterceptor&lt;/class-name&gt;
&lt;/remoting-outgoing-interceptors&gt;
</programlisting>
<para>The interceptors classes (and their dependencies) must be added to the server classpath
to be properly instantiated and called.</para>
</section>
<section>
<title>Interceptors on the Client Side</title>
<para>The interceptors can also be run on the client side to intercept packets
<emphasis>sent by the server</emphasis> by adding the interceptor to the <code>ClientSessionFactory</code>
with the <code>addInterceptor()</code> method.</para>
<para>The interceptors classes (and their dependencies) must be added to the client classpath
to be properly instantiated and called from the client side.</para>
<para>The interceptors can also be run on the client side to intercept packets either sent by the
client to the server or by the server to the client. This is done by adding the interceptor to
the <code>ServerLocator</code> with the <code>addIncomingInterceptor(Interceptor)</code> or
<code>addOutgoingInterceptor(Interceptor)</code> methods.</para>
<para>As noted above, if an interceptor returns <literal>false</literal> then the sending of the
packet is aborted which means that no other interceptors are be called and the packet is not
be processed further by the client. Typically this process happens transparently to the client
(i.e. it has no idea if a packet was aborted or not). However, in the case of an outgoing packet
that is sent in a <literal>blocking</literal> fashion a <literal>HornetQException</literal> will
be thrown to the caller. The exception is thrown because blocking sends provide reliability and
it is considered an error for them not to succeed. <literal>Blocking</literal> sends occurs when,
for example, an application invokes <literal>setBlockOnNonDurableSend(true)</literal> or
<literal>setBlockOnDurableSend(true)</literal> on its <literal>ServerLocator</literal> or if an
application is using a JMS connection factory retrieved from JNDI that has either
<literal>block-on-durable-send</literal> or <literal>block-on-non-durable-send</literal>
set to <literal>true</literal>. Blocking is also used for packets dealing with transactions (e.g.
commit, roll-back, etc.). The <literal>HornetQException</literal> thrown will contain the name
of the interceptor that returned false.</para>
<para>As on the server, the client interceptor classes (and their dependencies) must be added to the classpath
to be properly instantiated and invoked.</para>
</section>
<section>
<title>Example</title>
Expand Down
6 changes: 3 additions & 3 deletions examples/jms/interceptor/readme.html
Expand Up @@ -8,7 +8,7 @@
<body onload="prettyPrint()">
<h1>JMS Interceptor Example</h1>

<p>This example shows you how to implement and configure a simple interceptor with HornetQ.</p>
<p>This example shows you how to implement and configure a simple incoming, server-side interceptor with HornetQ.</p>

<p>HornetQ allows an application to use an interceptor to hook into the messaging system. All that needs to do is to implement the
Interceptor interface, as defined below: </p>
Expand All @@ -25,9 +25,9 @@ <h1>JMS Interceptor Example</h1>
<code>
&lt;configuration&gt;
...
&lt;remoting-interceptors&gt;
&lt;remoting-incoming-interceptors&gt;
&lt;class-name&gt;org.hornetq.jms.example.SimpleInterceptor&lt;/class-name&gt;
&lt;/remoting-interceptors&gt;
&lt;/remoting-incoming-interceptors&gt;
...
&lt;/configuration&gt;
</code>
Expand Down
Expand Up @@ -11,9 +11,9 @@
<paging-directory>${build.directory}/server0/data/messaging/paging</paging-directory>


<remoting-interceptors>
<remoting-incoming-interceptors>
<class-name>org.hornetq.jms.example.SimpleInterceptor</class-name>
</remoting-interceptors>
</remoting-incoming-interceptors>

<!-- Connectors -->

Expand Down
Expand Up @@ -632,21 +632,65 @@ public interface ServerLocator
void setInitialMessagePacketSize(int size);

/**
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
* Adds an interceptor which will be executed <em>after packets are received from the server</em>. Invoking this
* method is the same as invoking <code>addIncomingInterceptor(Interceptor).</code>
*
* @param interceptor an Interceptor
*
* @deprecated As of HornetQ 2.3.0.Final, replaced by
* {@link #addIncomingInterceptor(Interceptor)} and
* {@link #addOutgoingInterceptor(Interceptor)}
*/
@Deprecated
void addInterceptor(Interceptor interceptor);

/**
* Removes an interceptor.
* Adds an interceptor which will be executed <em>after packets are received from the server</em>.
*
* @param interceptor an Interceptor
*/
void addIncomingInterceptor(Interceptor interceptor);

/**
* Adds an interceptor which will be executed <em>before packets are sent to the server</em>.
*
* @param interceptor an Interceptor
*/
void addOutgoingInterceptor(Interceptor interceptor);

/**
* Removes an interceptor. Invoking this method is the same as invoking
* <code>removeIncomingInterceptor(Interceptor).</code>
*
* @param interceptor interceptor to remove
*
* @return <code>true</code> if the interceptor is removed from this factory, <code>false</code> else
*
* @deprecated As of HornetQ 2.3.0.Final, replaced by
* {@link #removeIncomingInterceptor(Interceptor)} and
* {@link #removeOutgoingInterceptor(Interceptor)}
*/
@Deprecated
boolean removeInterceptor(Interceptor interceptor);

/**
* Removes an incoming interceptor.
*
* @param interceptor interceptor to remove
*
* @return <code>true</code> if the incoming interceptor is removed from this factory, <code>false</code> else
*/
boolean removeIncomingInterceptor(Interceptor interceptor);

/**
* Removes an outgoing interceptor.
*
* @param interceptor interceptor to remove
*
* @return <code>true</code> if the outgoing interceptor is removed from this factory, <code>false</code> else
*/
boolean removeOutgoingInterceptor(Interceptor interceptor);

/**
* Closes this factory and release all its resources
*/
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.hornetq.api.core.client.FailoverEventType;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.protocol.ClientPacketDecoder;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
Expand Down Expand Up @@ -155,7 +154,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
/** Used to wait for the creation of a session. */
private CountDownLatch inCreateSessionLatch;

private final List<Interceptor> interceptors;
private final List<Interceptor> incomingInterceptors;

private final List<Interceptor> outgoingInterceptors;

private volatile boolean stopPingingAfterOne;

Expand Down Expand Up @@ -187,7 +188,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors,
PacketDecoder packetDecoder)
{

Expand Down Expand Up @@ -225,7 +227,9 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

closeExecutor = orderedExecutorFactory.getExecutor();

this.interceptors = interceptors;
this.incomingInterceptors = incomingInterceptors;

this.outgoingInterceptors = outgoingInterceptors;

this.packetDecoder = packetDecoder;
}
Expand Down Expand Up @@ -1324,7 +1328,7 @@ public CoreRemotingConnection getConnection()
return connection;
}

connection = new RemotingConnectionImpl(packetDecoder, tc, callTimeout, callFailoverTimeout, interceptors);
connection = new RemotingConnectionImpl(packetDecoder, tc, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors);

connection.addFailureListener(new DelegatingFailureListener(connection.getID()));

Expand Down
Expand Up @@ -184,7 +184,9 @@ private enum STATE
private transient STATE state;
private transient CountDownLatch latch;

private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private final List<Interceptor> incomingInterceptors = new CopyOnWriteArrayList<Interceptor>();

private final List<Interceptor> outgoingInterceptors = new CopyOnWriteArrayList<Interceptor>();

private static ExecutorService globalThreadPool;

Expand Down Expand Up @@ -667,7 +669,8 @@ public ClientSessionFactory createSessionFactory(final TransportConfiguration tr
reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors,
incomingInterceptors,
outgoingInterceptors,
packetDecoder);

addToConnecting(factory);
Expand Down Expand Up @@ -710,7 +713,8 @@ public ClientSessionFactory createSessionFactory(final TransportConfiguration tr
reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors,
incomingInterceptors,
outgoingInterceptors,
packetDecoder);

addToConnecting(factory);
Expand Down Expand Up @@ -802,7 +806,8 @@ public ClientSessionFactory createSessionFactory() throws HornetQException
reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors,
incomingInterceptors,
outgoingInterceptors,
packetDecoder);
try
{
Expand Down Expand Up @@ -1187,12 +1192,32 @@ public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration()

public void addInterceptor(final Interceptor interceptor)
{
interceptors.add(interceptor);
addIncomingInterceptor(interceptor);
}

public void addIncomingInterceptor(final Interceptor interceptor)
{
incomingInterceptors.add(interceptor);
}

public void addOutgoingInterceptor(final Interceptor interceptor)
{
outgoingInterceptors.add(interceptor);
}

public boolean removeInterceptor(final Interceptor interceptor)
{
return interceptors.remove(interceptor);
return removeIncomingInterceptor(interceptor);
}

public boolean removeIncomingInterceptor(final Interceptor interceptor)
{
return incomingInterceptors.remove(interceptor);
}

public boolean removeOutgoingInterceptor(final Interceptor interceptor)
{
return outgoingInterceptors.remove(interceptor);
}

public int getInitialMessagePacketSize()
Expand Down Expand Up @@ -1782,7 +1807,8 @@ private synchronized void createConnectors()
reconnectAttempts,
threadPool,
scheduledThreadPool,
interceptors,
incomingInterceptors,
outgoingInterceptors,
packetDecoder);

factory.disableFinalizeCheck();
Expand Down
Expand Up @@ -40,12 +40,31 @@ public interface HornetQServerControl
boolean isStarted();

/**
* Returns the list of interceptors used by this server.
* Returns the list of interceptors used by this server. Invoking this method is the same as invoking
* <code>getIncomingInterceptorClassNames().</code>
*
* @see Interceptor
* @deprecated As of HornetQ 2.3.0.Final, replaced by
* {@link #getIncomingInterceptorClassNames()} and
* {@link #getOutgoingInterceptorClassNames()}
*/
@Deprecated
String[] getInterceptorClassNames();

/**
* Returns the list of interceptors used by this server for incoming messages.
*
* @see Interceptor
*/
String[] getIncomingInterceptorClassNames();

/**
* Returns the list of interceptors used by this server for outgoing messages.
*
* @see Interceptor
*/
String[] getOutgoingInterceptorClassNames();

/**
* Returns whether this server is clustered.
*/
Expand Down
Expand Up @@ -44,22 +44,25 @@ public interface Channel
* sends a packet on this channel.
*
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was successful
*/
void send(Packet packet);
boolean send(Packet packet);

/**
* sends a packet on this channel using batching algorithm if appropriate
*
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was successful
*/
void sendBatched(Packet packet);
boolean sendBatched(Packet packet);

/**
* sends a packet on this channel and then blocks until it has been written to the connection.
*
* @param packet the packet to send
* @return false if the packet was rejected by an outgoing interceptor; true if the send was successful
*/
void sendAndFlush(Packet packet);
boolean sendAndFlush(Packet packet);

/**
* sends a packet on this channel and then blocks until a response is received or a timeout occurs.
Expand Down

0 comments on commit 2b1a59a

Please sign in to comment.