Skip to content

Commit

Permalink
Merge pull request #113 from conglisc/master
Browse files Browse the repository at this point in the history
JMS Consistent Region Beta
  • Loading branch information
conglisc committed Jul 31, 2015
2 parents 48244fc + 82bcf75 commit 9eda955
Show file tree
Hide file tree
Showing 5 changed files with 680 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,30 @@ and sends it to the output stream. A single message is converted into a single t

# Behavior in a consistent region

The `JMSSource` operator is not supported in a consistent region.
The `JMSSource` operator can participate in a consistent region. The operator must be at the start of a consistent region.

The operator supports periodic and operator-driven consistent region policies. If the consistent region policy is set as operatorDriven, the triggerCount parameter must be specified. The operator initiates a checkpoint after number of tuples specified by the triggerCount parameter have been processed.
If the consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly.

When a message queue is consumed by multiple message consumers, i.e. multiple `JMSSource` instances are used to read messages from a same queue, then deterministic routing is required. This requirement can be achieved through the messageSelector parameter. For example, if an SPL application has two JMSSource operator instances and a JMS property named "group" is present on messages that can take value of either 'g1' or 'g2', then each JMSSource operator instance can be assigned in the following manner:

MyPersonNamesStream1 = JMSSource()
{
param
connectionDocument :"/home/streamsuser/connections/JMSconnections.xml";
connection : "amqConn";
access : "amqAccess";
messageSelector : "group = 'g1'";
}

MyPersonNamesStream2 = JMSSource()
{
param
connectionDocument :"/home/streamsuser/connections/JMSconnections.xml";
connection : "amqConn";
access : "amqAccess";
messageSelector : "group = 'g2'";
}

# Exceptions

Expand Down Expand Up @@ -265,6 +288,24 @@ If this parameter is not specified, the operator uses the file that is in the de
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>triggerCount</name>
<description>
This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state.
This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region.</description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>messageSelector</name>
<description>
This optional parameter is used as JMS Message Selector.
</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
</inputPorts>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
Expand Down Expand Up @@ -70,6 +71,22 @@ class JMSConnectionHelper {

// Time to wait before try to resend failed message
private final long messageRetryDelay;

private final boolean useClientAckMode;

// JMS message selector
private String messageSelector;

// Timestamp of session creation
private long sessionCreationTime;

public long getSessionCreationTime() {
return sessionCreationTime;
}

private void setSessionCreationTime(long sessionCreationTime) {
this.sessionCreationTime = sessionCreationTime;
}

// procedure to detrmine if there exists a valid connection or not
private boolean isConnectValid() {
Expand Down Expand Up @@ -107,6 +124,7 @@ synchronized Session getSession() {
// object
private synchronized void setSession(Session session) {
this.session = session;
this.setSessionCreationTime(System.currentTimeMillis());
}

// setter for connect
Expand All @@ -128,7 +146,7 @@ private Connection getConnect() {
JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy,
int reconnectionBound, double period, boolean isProducer,
int maxMessageRetry, long messageRetryDelay,
String deliveryMode, Metric nReconnectionAttempts, Logger logger) {
String deliveryMode, Metric nReconnectionAttempts, Logger logger, boolean useClientAckMode, String messageSelector) {
this.reconnectionPolicy = reconnectionPolicy;
this.reconnectionBound = reconnectionBound;
this.period = period;
Expand All @@ -138,16 +156,18 @@ private Connection getConnect() {
this.nReconnectionAttempts = nReconnectionAttempts;
this.maxMessageRetries = maxMessageRetry;
this.messageRetryDelay = messageRetryDelay;
this.useClientAckMode = useClientAckMode;
this.messageSelector = messageSelector;
}

// This constructor sets the parameters required to create a connection for
// JMSSink
JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy,
int reconnectionBound, double period, boolean isProducer,
int maxMessageRetry, long messageRetryDelay, String deliveryMode,
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger) {
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger, boolean useClientAckMode) {
this(reconnectionPolicy, reconnectionBound, period, isProducer,
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger);
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger, useClientAckMode, null);
this.nFailedInserts = nFailedInserts;

}
Expand Down Expand Up @@ -223,6 +243,9 @@ private synchronized void createConnection() throws ConnectionException,
break;
}

} catch (InvalidSelectorException e) {
throw new ConnectionException(
"Connection to JMS failed. Invalid message selector");
} catch (JMSException e) {
logger.log(LogLevel.ERROR, "RECONNECTION_EXCEPTION",
new Object[] { e.toString() });
Expand Down Expand Up @@ -272,7 +295,14 @@ private boolean connect(boolean isProducer) throws JMSException {

// Create session from connection; false means
// session is not transacted.
setSession(getConnect().createSession(false, Session.AUTO_ACKNOWLEDGE));

if(isProducer) {
setSession(getConnect().createSession(this.useClientAckMode, Session.AUTO_ACKNOWLEDGE));
}
else {
setSession(getConnect().createSession(false, (this.useClientAckMode) ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
}


if (isProducer == true) {
// Its JMSSink, So we will create a producer
Expand All @@ -294,7 +324,7 @@ private boolean connect(boolean isProducer) throws JMSException {

} else {
// Its JMSSource, So we will create a consumer
setConsumer(getSession().createConsumer(dest));
setConsumer(getSession().createConsumer(dest, messageSelector));
// start the connection
getConnect().start();
}
Expand Down Expand Up @@ -352,16 +382,26 @@ boolean sendMessage(Message message) throws ConnectionException,
}

// this subroutine receives messages from a message consumer
Message receiveMessage() throws ConnectionException, InterruptedException,
// This method supports either blocking or non-blocking receive
// if wait is false, then timeout value is ignored
Message receiveMessage(boolean wait, long timeout) throws ConnectionException, InterruptedException,
JMSException {
try {
// try to receive a message
synchronized (getSession()) {
return (getConsumer().receive());

if(wait) {
// try to receive a message via blocking method
synchronized (getSession()) {
return (getConsumer().receive(timeout));
}
}

else {
// try to receive a message with non blocking method
synchronized (getSession()) {
return (getConsumer().receiveNoWait());
}
}

}

catch (JMSException e) {
// If the JMSSource operator was interrupted in middle
if (e.toString().contains("java.lang.InterruptedException")) {
Expand All @@ -374,10 +414,39 @@ Message receiveMessage() throws ConnectionException, InterruptedException,
new Object[] { e.toString() });
logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT");
createConnection();
// retry to receive
// retry to receive again
if(wait) {
// try to receive a message via blocking method
synchronized (getSession()) {
return (getConsumer().receive(timeout));
}
}
else {
// try to receive a message with non blocking method
synchronized (getSession()) {
return (getConsumer().receiveNoWait());
}
}
}
}

// Recovers session causing unacknowledged message to be re-delivered
public void recoverSession() throws JMSException, ConnectionException, InterruptedException {

try {
synchronized (getSession()) {
getSession().recover();
}
} catch (JMSException e) {

logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT");
setConnect(null);
createConnection();

synchronized (getSession()) {
return (getConsumer().receive());
getSession().recover();
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;


//The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic.

public class JMSSink extends AbstractOperator implements StateHandler{
Expand Down Expand Up @@ -158,6 +159,9 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) {
// Variable to define if the connection attempted to the JMSProvider is the
// first one.
private boolean isInitialConnection = true;

// consistent region context
private ConsistentRegionContext consistentRegionContext;

// Mandatory parameter access
@Parameter(optional = false)
Expand Down Expand Up @@ -373,7 +377,7 @@ public synchronized void initialize(OperatorContext context)

JmsClasspathUtil.setupClassPaths(context);

context.registerStateHandler(this);
consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class);

/*
* Set appropriate variables if the optional error output port is
Expand Down Expand Up @@ -414,7 +418,7 @@ public synchronized void initialize(OperatorContext context)
jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy,
reconnectionBound, period, true, maxMessageSendRetries,
messageSendRetryDelay, connectionDocumentParser.getDeliveryMode(),
nReconnectionAttempts, nFailedInserts, logger);
nReconnectionAttempts, nFailedInserts, logger, (consistentRegionContext != null));
jmsConnectionHelper.createAdministeredObjects(
connectionDocumentParser.getInitialContextFactory(),
connectionDocumentParser.getProviderURL(),
Expand Down Expand Up @@ -486,6 +490,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
jmsConnectionHelper.createInitialConnection();
isInitialConnection = false;
}

// Construct the JMS message based on the message type taking the
// attributes from the tuple.
Message message = mhandler.convertTupleToMessage(tuple,
Expand All @@ -499,6 +504,8 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
"Dropping this tuple since an exception occurred while sending.");
}
}



}

Expand Down Expand Up @@ -539,6 +546,11 @@ public void close() throws IOException {
@Override
public void checkpoint(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().commit();
}

}

@Override
Expand All @@ -549,11 +561,20 @@ public void drain() throws Exception {
@Override
public void reset(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}

}

@Override
public void resetToInitialState() throws Exception {
logger.log(LogLevel.INFO, "Reset to initial state");

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}
}

@Override
Expand Down
Loading

0 comments on commit 9eda955

Please sign in to comment.