diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml index 4ce9d4c..7de491c 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml @@ -21,7 +21,30 @@ and sends it to the output stream. A single message is converted into a single t # Behavior in a consistent region -The `JMSSource` operator is not supported in a consistent region. +The `JMSSource` operator can participate in a consistent region. The operator must be at the start of a consistent region. + +The operator supports periodic and operator-driven consistent region policies. If the consistent region policy is set as operatorDriven, the triggerCount parameter must be specified. The operator initiates a checkpoint after number of tuples specified by the triggerCount parameter have been processed. +If the consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly. + +When a message queue is consumed by multiple message consumers, i.e. multiple `JMSSource` instances are used to read messages from a same queue, then deterministic routing is required. This requirement can be achieved through the messageSelector parameter. For example, if an SPL application has two JMSSource operator instances and a JMS property named "group" is present on messages that can take value of either 'g1' or 'g2', then each JMSSource operator instance can be assigned in the following manner: + +MyPersonNamesStream1 = JMSSource() + { + param + connectionDocument :"/home/streamsuser/connections/JMSconnections.xml"; + connection : "amqConn"; + access : "amqAccess"; + messageSelector : "group = 'g1'"; + } + +MyPersonNamesStream2 = JMSSource() + { + param + connectionDocument :"/home/streamsuser/connections/JMSconnections.xml"; + connection : "amqConn"; + access : "amqAccess"; + messageSelector : "group = 'g2'"; + } # Exceptions @@ -265,6 +288,24 @@ If this parameter is not specified, the operator uses the file that is in the de rstring 1 + + triggerCount + +This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state. +This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region. + true + int32 + 1 + + + messageSelector + + This optional parameter is used as JMS Message Selector. + + true + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index 698b2b0..e9642cb 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -12,6 +12,7 @@ import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -70,6 +71,22 @@ class JMSConnectionHelper { // Time to wait before try to resend failed message private final long messageRetryDelay; + + private final boolean useClientAckMode; + + // JMS message selector + private String messageSelector; + + // Timestamp of session creation + private long sessionCreationTime; + + public long getSessionCreationTime() { + return sessionCreationTime; + } + + private void setSessionCreationTime(long sessionCreationTime) { + this.sessionCreationTime = sessionCreationTime; + } // procedure to detrmine if there exists a valid connection or not private boolean isConnectValid() { @@ -107,6 +124,7 @@ synchronized Session getSession() { // object private synchronized void setSession(Session session) { this.session = session; + this.setSessionCreationTime(System.currentTimeMillis()); } // setter for connect @@ -128,7 +146,7 @@ private Connection getConnect() { JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy, int reconnectionBound, double period, boolean isProducer, int maxMessageRetry, long messageRetryDelay, - String deliveryMode, Metric nReconnectionAttempts, Logger logger) { + String deliveryMode, Metric nReconnectionAttempts, Logger logger, boolean useClientAckMode, String messageSelector) { this.reconnectionPolicy = reconnectionPolicy; this.reconnectionBound = reconnectionBound; this.period = period; @@ -138,6 +156,8 @@ private Connection getConnect() { this.nReconnectionAttempts = nReconnectionAttempts; this.maxMessageRetries = maxMessageRetry; this.messageRetryDelay = messageRetryDelay; + this.useClientAckMode = useClientAckMode; + this.messageSelector = messageSelector; } // This constructor sets the parameters required to create a connection for @@ -145,9 +165,9 @@ private Connection getConnect() { JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy, int reconnectionBound, double period, boolean isProducer, int maxMessageRetry, long messageRetryDelay, String deliveryMode, - Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger) { + Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger, boolean useClientAckMode) { this(reconnectionPolicy, reconnectionBound, period, isProducer, - maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger); + maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger, useClientAckMode, null); this.nFailedInserts = nFailedInserts; } @@ -223,6 +243,9 @@ private synchronized void createConnection() throws ConnectionException, break; } + } catch (InvalidSelectorException e) { + throw new ConnectionException( + "Connection to JMS failed. Invalid message selector"); } catch (JMSException e) { logger.log(LogLevel.ERROR, "RECONNECTION_EXCEPTION", new Object[] { e.toString() }); @@ -272,7 +295,14 @@ private boolean connect(boolean isProducer) throws JMSException { // Create session from connection; false means // session is not transacted. - setSession(getConnect().createSession(false, Session.AUTO_ACKNOWLEDGE)); + + if(isProducer) { + setSession(getConnect().createSession(this.useClientAckMode, Session.AUTO_ACKNOWLEDGE)); + } + else { + setSession(getConnect().createSession(false, (this.useClientAckMode) ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE)); + } + if (isProducer == true) { // Its JMSSink, So we will create a producer @@ -294,7 +324,7 @@ private boolean connect(boolean isProducer) throws JMSException { } else { // Its JMSSource, So we will create a consumer - setConsumer(getSession().createConsumer(dest)); + setConsumer(getSession().createConsumer(dest, messageSelector)); // start the connection getConnect().start(); } @@ -352,16 +382,26 @@ boolean sendMessage(Message message) throws ConnectionException, } // this subroutine receives messages from a message consumer - Message receiveMessage() throws ConnectionException, InterruptedException, + // This method supports either blocking or non-blocking receive + // if wait is false, then timeout value is ignored + Message receiveMessage(boolean wait, long timeout) throws ConnectionException, InterruptedException, JMSException { try { - // try to receive a message - synchronized (getSession()) { - return (getConsumer().receive()); + + if(wait) { + // try to receive a message via blocking method + synchronized (getSession()) { + return (getConsumer().receive(timeout)); + } } - + else { + // try to receive a message with non blocking method + synchronized (getSession()) { + return (getConsumer().receiveNoWait()); + } + } + } - catch (JMSException e) { // If the JMSSource operator was interrupted in middle if (e.toString().contains("java.lang.InterruptedException")) { @@ -374,10 +414,39 @@ Message receiveMessage() throws ConnectionException, InterruptedException, new Object[] { e.toString() }); logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); createConnection(); - // retry to receive + // retry to receive again + if(wait) { + // try to receive a message via blocking method + synchronized (getSession()) { + return (getConsumer().receive(timeout)); + } + } + else { + // try to receive a message with non blocking method + synchronized (getSession()) { + return (getConsumer().receiveNoWait()); + } + } + } + } + + // Recovers session causing unacknowledged message to be re-delivered + public void recoverSession() throws JMSException, ConnectionException, InterruptedException { + + try { + synchronized (getSession()) { + getSession().recover(); + } + } catch (JMSException e) { + + logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); + setConnect(null); + createConnection(); + synchronized (getSession()) { - return (getConsumer().receive()); + getSession().recover(); } + } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 7cf322e..bcb8a5f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -36,6 +36,7 @@ import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.state.StateHandler; + //The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic. public class JMSSink extends AbstractOperator implements StateHandler{ @@ -158,6 +159,9 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { // Variable to define if the connection attempted to the JMSProvider is the // first one. private boolean isInitialConnection = true; + + // consistent region context + private ConsistentRegionContext consistentRegionContext; // Mandatory parameter access @Parameter(optional = false) @@ -373,7 +377,7 @@ public synchronized void initialize(OperatorContext context) JmsClasspathUtil.setupClassPaths(context); - context.registerStateHandler(this); + consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); /* * Set appropriate variables if the optional error output port is @@ -414,7 +418,7 @@ public synchronized void initialize(OperatorContext context) jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy, reconnectionBound, period, true, maxMessageSendRetries, messageSendRetryDelay, connectionDocumentParser.getDeliveryMode(), - nReconnectionAttempts, nFailedInserts, logger); + nReconnectionAttempts, nFailedInserts, logger, (consistentRegionContext != null)); jmsConnectionHelper.createAdministeredObjects( connectionDocumentParser.getInitialContextFactory(), connectionDocumentParser.getProviderURL(), @@ -486,6 +490,7 @@ public void process(StreamingInput stream, Tuple tuple) jmsConnectionHelper.createInitialConnection(); isInitialConnection = false; } + // Construct the JMS message based on the message type taking the // attributes from the tuple. Message message = mhandler.convertTupleToMessage(tuple, @@ -499,6 +504,8 @@ public void process(StreamingInput stream, Tuple tuple) "Dropping this tuple since an exception occurred while sending."); } } + + } @@ -539,6 +546,11 @@ public void close() throws IOException { @Override public void checkpoint(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "checkpoint " + checkpoint.getSequenceId()); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().commit(); + } + } @Override @@ -549,11 +561,20 @@ public void drain() throws Exception { @Override public void reset(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId()); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().rollback(); + } + } @Override public void resetToInitialState() throws Exception { logger.log(LogLevel.INFO, "Reset to initial state"); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().rollback(); + } } @Override diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index bc936c1..3a7f90f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -6,9 +6,14 @@ import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; import java.util.logging.Logger; +import javax.jms.JMSException; import javax.jms.Message; import javax.naming.NamingException; import javax.xml.parsers.ParserConfigurationException; @@ -28,10 +33,12 @@ import com.ibm.streams.operator.model.CustomMetric; import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.samples.patterns.ProcessTupleProducer; +import com.ibm.streams.operator.state.Checkpoint; import com.ibm.streams.operator.state.ConsistentRegionContext; +import com.ibm.streams.operator.state.StateHandler; //The JMSSource operator converts a message JMS queue or topic to stream -public class JMSSource extends ProcessTupleProducer { +public class JMSSource extends ProcessTupleProducer implements StateHandler{ private static final String CLASS_NAME = "com.ibm.streamsx.messaging.jms.JMSSource"; @@ -71,6 +78,9 @@ public class JMSSource extends ProcessTupleProducer { // MQ // set to true for Apache Active MQ. private boolean isAMQ; + + // consistent region context + private ConsistentRegionContext consistentRegionContext; // Variables to hold performance metrices for JMSSource @@ -82,6 +92,9 @@ public class JMSSource extends ProcessTupleProducer { Metric nMessagesRead; Metric nMessagesDropped; Metric nReconnectionAttempts; + + // when in consistent region, this parameter is used to indicate max time the receive method should block + public static final long RECEIVE_TIMEOUT = 500l; // initialize the metrices. @CustomMetric(kind = Metric.Kind.COUNTER) @@ -146,6 +159,36 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { // Declaring the JMSMEssagehandler, private JMSMessageHandlerImpl messageHandlerImpl; + + // Specify after how many messages are received, the operator should establish consistent region + private int triggerCount = 0; + + // instance of JMSSourceCRState to hold variables required for consistent region + private JMSSourceCRState crState = null; + + private String messageSelector = null; + + private boolean initalConnectionEstablished = false; + + private Object resetLock = new Object(); + + public String getMessageSelector() { + return messageSelector; + } + + @Parameter(optional = true) + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public int getTriggerCount() { + return triggerCount; + } + + @Parameter(optional = true) + public void setTriggerCount(int triggerCount) { + this.triggerCount = triggerCount; + } // Mandatory parameter access @Parameter(optional = false) @@ -190,6 +233,81 @@ public void setConnectionDocument(String connectionDocument) { this.connectionDocument = connectionDocument; } + // Class to hold various variables for consistent region + private class JMSSourceCRState { + + // Last fully processed message + private Message lastMsgSent; + + // Counters for counting number of received messages + private int msgCounter; + + // Flag to indicate a checkpoint has been made. + private boolean isCheckpointPerformed; + + private List msgIDWIthSameTS; + + JMSSourceCRState() { + lastMsgSent = null; + msgCounter = 0; + isCheckpointPerformed = false; + msgIDWIthSameTS = new ArrayList(); + } + + public List getMsgIDWIthSameTS() { + return msgIDWIthSameTS; + } + + public boolean isCheckpointPerformed() { + return isCheckpointPerformed; + } + + public void setCheckpointPerformed(boolean isCheckpointPerformed) { + this.isCheckpointPerformed = isCheckpointPerformed; + } + + public void increaseMsgCounterByOne() { + this.msgCounter++; + } + + public Message getLastMsgSent() { + return lastMsgSent; + } + + public void setLastMsgSent(Message lastMsgSent) { + + try { + if(this.lastMsgSent != null && this.lastMsgSent.getJMSTimestamp() != lastMsgSent.getJMSTimestamp()) { + this.msgIDWIthSameTS.clear(); + } + + this.msgIDWIthSameTS.add(lastMsgSent.getJMSMessageID()); + } catch (JMSException e) { + + } + this.lastMsgSent = lastMsgSent; + + } + + public int getMsgCounter() { + return msgCounter; + } + + public void acknowledgeMsg() throws JMSException { + if(lastMsgSent != null) { + lastMsgSent.acknowledge(); + } + } + + public void reset() { + lastMsgSent = null; + msgCounter = 0; + isCheckpointPerformed = false; + msgIDWIthSameTS = new ArrayList(); + } + + } + public String getConnectionDocument() { if (connectionDocument == null) @@ -211,11 +329,13 @@ public String getConnectionDocument() { @ContextCheck(compile = true) public static void checkInConsistentRegion(OperatorContextChecker checker) { ConsistentRegionContext consistentRegionContext = checker.getOperatorContext().getOptionalContext(ConsistentRegionContext.class); + OperatorContext context = checker.getOperatorContext(); - if(consistentRegionContext != null) { - checker.setInvalidContext("The following operator cannot be in a consistent region: JMSSource", new String[] {}); + if(consistentRegionContext != null && consistentRegionContext.isTriggerOperator() && !context.getParameterNames().contains("triggerCount")) { + checker.setInvalidContext("triggerCount parameter must be set when consistent region is configured to operator driven", new String[] {}); } } + /* * The method checkErrorOutputPort validates that the stream on error output * port contains the mandatory attribute of type rstring which will contain @@ -292,6 +412,16 @@ public static void checkParametersRuntime(OperatorContextChecker checker) { } } + if(context.getParameterNames().contains("triggerCount")) { + if(Integer.valueOf(context.getParameterValues("triggerCount").get(0)) < 1) { + logger.log(LogLevel.ERROR, "triggerCount should be greater than zero"); + checker.setInvalidContext( + "triggerCount value {0} should be greater than zero ", + new String[] { context.getParameterValues("triggerCount") + .get(0).trim() }); + } + } + } // add check for reconnectionPolicy is present if either period or @@ -311,7 +441,13 @@ public synchronized void initialize(OperatorContext context) super.initialize(context); + consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); + JmsClasspathUtil.setupClassPaths(context); + + if(consistentRegionContext != null) { + crState = new JMSSourceCRState(); + } // create connection document parser object (which is responsible for // parsing the connection document) @@ -351,7 +487,7 @@ public synchronized void initialize(OperatorContext context) jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy, reconnectionBound, period, false, 0, 0, connectionDocumentParser.getDeliveryMode(), - nReconnectionAttempts, logger); + nReconnectionAttempts, logger, (consistentRegionContext != null), messageSelector); jmsConnectionHelper.createAdministeredObjects( connectionDocumentParser.getInitialContextFactory(), connectionDocumentParser.getProviderURL(), @@ -391,21 +527,84 @@ public synchronized void initialize(OperatorContext context) @Override protected void process() throws UnsupportedEncodingException, InterruptedException, ConnectionException, Exception { + + boolean isInConsistentRegion = consistentRegionContext != null; + boolean isTriggerOperator = isInConsistentRegion && consistentRegionContext.isTriggerOperator(); + // create the initial connection. - jmsConnectionHelper.createInitialConnection(); + try { + jmsConnectionHelper.createInitialConnection(); + if(isInConsistentRegion) { + notifyResetLock(true); + } + } catch (Exception e1) { + + if(isInConsistentRegion) { + notifyResetLock(false); + } + // Initial connection fails to be created. + // throw the exception. + throw e1; + } + + long timeout = isInConsistentRegion ? JMSSource.RECEIVE_TIMEOUT : 0; + long sessionCreationTime = 0; + while (!Thread.interrupted()) { // read a message from the consumer + try { - Message m = jmsConnectionHelper.receiveMessage(); + + if(isInConsistentRegion) { + consistentRegionContext.acquirePermit(); + + // A checkpoint has been made, thus acknowledging the last sent message + if(crState.isCheckpointPerformed()) { + + try { + crState.acknowledgeMsg(); + } catch (Exception e) { + consistentRegionContext.reset(); + } finally { + crState.reset(); + } + + } + } + + Message m = jmsConnectionHelper.receiveMessage(true, timeout); + + if(m == null) { + continue; + } + + if(isInConsistentRegion) { + // following section takes care of possible duplicate messages + // i.e connection re-created due to failure causing unacknowledged message to be delivered again + // we don't want to process duplicate messages again. + if(crState.getLastMsgSent() == null) { + sessionCreationTime = jmsConnectionHelper.getSessionCreationTime(); + } + else { + // if session has been re-created and message is duplicate,ignore + if(jmsConnectionHelper.getSessionCreationTime() > sessionCreationTime && + isDuplicateMsg(m, crState.getLastMsgSent().getJMSTimestamp(), crState.getMsgIDWIthSameTS())) { + logger.log(LogLevel.INFO, "Ignored duplicated message: " + m.getJMSMessageID()); + continue; + } + } + } + // nMessagesRead indicates the number of messages which we have // read from the JMS Provider successfully nMessagesRead.incrementValue(1); OutputTuple dataTuple = dataOutputPort.newTuple(); + // convert the message to the output Tuple using the appropriate // message handler - MessageAction returnVal = messageHandlerImpl .convertMessageToTuple(m, dataTuple); + // take an action based on the return type switch (returnVal) { // the message type is incorrect @@ -457,9 +656,29 @@ protected void process() throws UnsupportedEncodingException, dataOutputPort.submit(dataTuple); break; } - + + // set last processed message + if(isInConsistentRegion) { + crState.setLastMsgSent(m); + } + + // If the consistent region is driven by operator, then + // 1. increase message counter + // 2. Call make consistent region if message counter reached the triggerCounter specified by user + if(isTriggerOperator) { + crState.increaseMsgCounterByOne(); + + if(crState.getMsgCounter() == getTriggerCount()){ + consistentRegionContext.makeConsistent(); + } + } + } catch (Exception Ex) { + } finally { + if(consistentRegionContext != null) { + consistentRegionContext.releasePermit(); + } } } } @@ -477,6 +696,7 @@ private void sendOutputErrorMsg(String errorMessage) throws Exception { @Override public void shutdown() throws Exception { // close the connection. + if (isAMQ) { super.shutdown(); jmsConnectionHelper.closeConnection(); @@ -486,4 +706,143 @@ public void shutdown() throws Exception { } } + + private boolean isInitalConnectionEstablished() throws InterruptedException { + + synchronized(resetLock) { + if(initalConnectionEstablished) { + return true; + } + + resetLock.wait(); + return initalConnectionEstablished; + } + } + + private void notifyResetLock(boolean result) { + if(consistentRegionContext != null) { + synchronized(resetLock) { + initalConnectionEstablished = result; + resetLock.notifyAll(); + } + } + } + + @Override + public void close() throws IOException { + + } + + @Override + public void checkpoint(Checkpoint checkpoint) throws Exception { + logger.log(LogLevel.INFO, "Checkpoint... "); + + crState.setCheckpointPerformed(true); + + ObjectOutputStream stream = checkpoint.getOutputStream(); + + stream.writeBoolean(crState.getLastMsgSent() != null); + + if(crState.getLastMsgSent() != null) { + stream.writeLong(crState.getLastMsgSent().getJMSTimestamp()); + stream.writeObject(crState.getMsgIDWIthSameTS()); + } + + } + + @Override + public void drain() throws Exception { + logger.log(LogLevel.INFO, "Drain... "); + + } + + @SuppressWarnings("unchecked") + @Override + public void reset(Checkpoint checkpoint) throws Exception { + logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId()); + + if(!isInitalConnectionEstablished()) { + throw new ConnectionException("Connection to JMS failed."); + } + + // Reset consistent region variables and recover JMS session to make re-delivery of + // unacknowledged message + jmsConnectionHelper.recoverSession(); + + ObjectInputStream stream = checkpoint.getInputStream(); + boolean hasMsg = stream.readBoolean(); + + if(hasMsg) { + long lastSentMsgTS = stream.readLong(); + List lastSentMsgIDs = (List) stream.readObject(); + + deduplicateMsg(lastSentMsgTS, lastSentMsgIDs); + } + + crState.reset(); + + } + + private boolean isDuplicateMsg(Message msg, long lastSentMsgTs, List lastSentMsgIDs) throws JMSException { + boolean res = false; + + if(msg.getJMSTimestamp() < lastSentMsgTs) { + res = true; + } + else if(msg.getJMSTimestamp() == lastSentMsgTs) { + + if(lastSentMsgIDs.contains(msg.getJMSMessageID())) { + res = true; + } + + } + + return res; + + } + + private void deduplicateMsg(long lastSentMsgTs, List lastSentMsgIDs) throws JMSException, ConnectionException, InterruptedException { + logger.log(LogLevel.INFO, "Deduplicate messages..."); + + boolean stop = false; + + while(!stop) { + + Message msg = jmsConnectionHelper.receiveMessage(false, 0); + + if(msg == null) { + return; + } + + if(isDuplicateMsg(msg, lastSentMsgTs, lastSentMsgIDs)) { + msg.acknowledge(); + logger.log(LogLevel.INFO, "Ignored duplicated message: " + msg.getJMSMessageID()); + } + else { + jmsConnectionHelper.recoverSession(); + stop = true; + } + + } + + } + + @Override + public void resetToInitialState() throws Exception { + logger.log(LogLevel.INFO, "Resetting to Initial..."); + + if(!isInitalConnectionEstablished()) { + throw new ConnectionException("Connection to JMS failed."); + } + + jmsConnectionHelper.recoverSession(); + crState.reset(); + + } + + @Override + public void retireCheckpoint(long id) throws Exception { + logger.log(LogLevel.INFO, "Retire checkpoint " + id); + + } } diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 56cf817..4c2857b 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -65,7 +65,7 @@ or the SPL compiler to be aware of the location of the toolkit. because the `JMSSink` and `JMSSource` operators require the libraries that are installed with IBM InfoSphere Streams. However, the ActiveMQ instance to which a message is sent can be a different machine. You must set the **STREAMS_MESSAGING_AMQ_HOME** environment variable to the location where Apache ActiveMQ is installed. For example: - export STREAMS_MESSAGING_AMQ_HOME="/home/streamsuser/ApacheActiveMQ" + export STREAMS_MESSAGING_AMQ_HOME="/home/streamsuser/ApacheActiveMQ" * WebSphere MQ Server * The `JMSSource`, `JMSSink`, `XMSSource` and `XMSSink` operators support WebSphere MQ v7.5 and v8.0. * The `XMSSource` and `XMSSink` operators require that the IBM Message Service Client (XMS) for C/C++ is installed. @@ -77,7 +77,7 @@ or the SPL compiler to be aware of the location of the toolkit. * The WebSphere MQ Client libraries must be installed on the hosts where the JMS operators can be run. You must set the **STREAMS_MESSAGING_WMQ_HOME** environment variable to the location where WebSphere MQ is installed. For example: - export STREAMS_MESSAGING_WMQ_HOME="/opt/mqm" + export STREAMS_MESSAGING_WMQ_HOME="/opt/mqm" # About this task @@ -122,7 +122,7 @@ Alternatively, you can fully qualify the operators that are provided by toolkit A connection specifications document is an XML document that describes how operators in the Messaging Toolkit connect to and access messaging systems. -Each document contains a collection of <connection_specification> elements and <access_specification> elements. +Each document contains a collection of <connection_specification> elements and <access_specification> elements. The relationship between connection specifications and access specifications is many-to-many. Operators can connect to the same messaging system (one connection specification) and access several different queues @@ -144,8 +144,8 @@ The operators have run-time checks to validate configuration; if the configuration is incorrect these checks might result in run time failures, which are captured in the processing element logs. For `MQTTSink` and `MQTTSource` operators, the connection specifications document that is named by **connectionDocument** parameter -contains only a <connection_specification> element. -It does not contain an <access_specification> element. +contains only a <connection_specification> element. +It does not contain an <access_specification> element. The **connectionDocument** parameter is not mandatory. You can specify the connection information as parameters to the operator. @@ -154,54 +154,54 @@ that is named by **connectionDocument** parameter at run time. If this parameter the operator looks for a `etc/connections.xml` file, where the path is relative to the application root directory directory. A valid connection specifications document consists of a connections root element -which contains one <connection_specifications> element and one <access_specifications> element. +which contains one <connection_specifications> element and one <access_specifications> element. These elements serve as containers for the connection specifications and access specifications. Here is an abridged example of a complete connection specifications document, -with all <connection_specification> and <access_specification> elements omitted: +with all <connection_specification> and <access_specification> elements omitted: - <?xml version="1.0" encoding="UTF-8"?> - <st:connections xmlns:st="http://www.ibm.com/xmlns/prod/streams/adapters" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <connection_specifications> + <?xml version="1.0" encoding="UTF-8"?> + <st:connections xmlns:st="http://www.ibm.com/xmlns/prod/streams/adapters" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <connection_specifications> ... - </connection_specifications> - <access_specifications> + </connection_specifications> + <access_specifications> ... - </access_specifications> - </st:connections> + </access_specifications> + </st:connections> ++ Connection_specification element -A <connection_specifications> element is a sequence of zero or more <connection_specification> elements. +A <connection_specifications> element is a sequence of zero or more <connection_specification> elements. -Each <connection_specification> element has a single attribute, name. +Each <connection_specification> element has a single attribute, name. This name value is used in the **connection** parameter of Messaging Toolkit operators. -A <connection_specification> element must have exactly one element. +A <connection_specification> element must have exactly one element. For the Messaging Toolkit, this element is an XMS, JMS, or a MQTT element. -Use the following <connection_specification> elements for each of the Messaging Toolkit operators: +Use the following <connection_specification> elements for each of the Messaging Toolkit operators: * XMS: `XMSSink` and `XMSSource` operators * JMS: `JMSSink` and `JMSSource` operators * MQTT: `MQTTSink` and `MQTTSource` operators +++ JMS element -The <JMS> element specifies the information that is needed by the `JMSSink` and `JMSSource` operators +The <JMS> element specifies the information that is needed by the `JMSSink` and `JMSSource` operators to establish a connection to messaging system. -The following example shows connection specification that contains a <JMS> element: +The following example shows connection specification that contains a <JMS> element: - <connection_specifications> - <connection_specification name="amqConn"> - <JMS initial_context="org.apache.activemq.jndi.ActiveMQInitialContextFactory" - provider_url = "tcp://machinename.com:61616" - connection_factory = "ConnectionFactory"/> - user="user1" - password="password1" - </connection_specification> - </connection_specifications> + <connection_specifications> + <connection_specification name="amqConn"> + <JMS initial_context="org.apache.activemq.jndi.ActiveMQInitialContextFactory" + provider_url = "tcp://machinename.com:61616" + connection_factory = "ConnectionFactory"/> + user="user1" + password="password1" + </connection_specification> + </connection_specifications> -The <JMS> element has the following attributes: +The <JMS> element has the following attributes: * connection_factory * This mandatory attribute value is the name of the ConnectionFactory administered object. This administered object is within the directory service context that is specified by the initial_context attribute, @@ -234,18 +234,18 @@ The <JMS> element has the following attributes: +++ MQTT element -The <MQTT> element specifies the information that is needed by the `MQTTSink` and `MQTTSource` operators +The <MQTT> element specifies the information that is needed by the `MQTTSink` and `MQTTSource` operators to establish a connection with MQTT-based messaging systems. -The following example shows a <connection_specification> element that contains an <MQTT> element: +The following example shows a <connection_specification> element that contains an <MQTT> element: - <connection_specifications> - <connection_specification name="mqttconn"> - <MQTT serverURI="ssl://hostname:1883" trustStore="../../etc/trustFile.pem"/> - </connection_specification> - </connection_specifications> + <connection_specifications> + <connection_specification name="mqttconn"> + <MQTT serverURI="ssl://hostname:1883" trustStore="../../etc/trustFile.pem"/> + </connection_specification> + </connection_specifications> -The <MQTT> element has the following attributes: +The <MQTT> element has the following attributes: * keyStorePassword * This optional attribute specifies the password for the keystore file. This attribute is specified when the keyStore attribute is specified and the keystore file it is protected by a password. @@ -277,22 +277,22 @@ The <MQTT> element has the following attributes: +++ XMS element -The <XMS> element specifies the information that is needed by the `XMSSink` and `XMSSource` operators +The <XMS> element specifies the information that is needed by the `XMSSink` and `XMSSource` operators to establish a connection to messaging system. -The <XMS> element does not actually contain the information; +The <XMS> element does not actually contain the information; rather, it references an XMS or JMS administered object, which is held in an external directory service. The information about how to make the connection is held in this administered object. -Here is an example connection specification that contains an <XMS> element: +Here is an example connection specification that contains an <XMS> element: - <connection_specification name="wbe"> - <XMS initial_context="file:///opt/InitialContext/" - connection_factory="TestConFac" - user="user1" - password="password1" /> - </connection_specification> + <connection_specification name="wbe"> + <XMS initial_context="file:///opt/InitialContext/" + connection_factory="TestConFac" + user="user1" + password="password1" /> + </connection_specification> -The <XMS> element has four attributes: +The <XMS> element has four attributes: * initial_context * This mandatory attribute value is a URL that points to the directory service that contains the administered objects. It uses the reference JNDI implementation, which stores objects in the file system. @@ -319,57 +319,57 @@ The <XMS> element has four attributes: ++ Access_specification element -A sequence of one or more <access_specification> elements make up an <access_specifications> element. -In the Messaging Toolkit, the <access_specification> element includes a <destination> element. +A sequence of one or more <access_specification> elements make up an <access_specifications> element. +In the Messaging Toolkit, the <access_specification> element includes a <destination> element. -Each <access_specification> element has a single attribute, name. +Each <access_specification> element has a single attribute, name. The name value can be specified in the access parameter of a Messaging Toolkit operator. -**Note**: The `MQTTSource` and `MQTTSink` operators do not support the <access_specification> element. +**Note**: The `MQTTSource` and `MQTTSink` operators do not support the <access_specification> element. -In the Messaging Toolkit, an <access_specification> element has: -* A <destination> element; -* One or more <uses_connection> elements; -* No more than one <native_schema> element. +In the Messaging Toolkit, an <access_specification> element has: +* A <destination> element; +* One or more <uses_connection> elements; +* No more than one <native_schema> element. +++ Destination element -The <destination> element specifies the queue or topic that is used by the Messaging Toolkit operators. +The <destination> element specifies the queue or topic that is used by the Messaging Toolkit operators. The element also specifies some parameters that control the behavior of the operator. -For WebSphere MQ, the <destination> element does not actually contain this information, +For WebSphere MQ, the <destination> element does not actually contain this information, instead it specifies the information indirectly by referencing an XMS or JMS administered object that is held in an external directory service. -For Apache ActiveMQ, the <destination> element can directly refer to a queue or a topic. - -The following example shows an access specification that contains a <destination> element for the WebSphere MQ messaging system: - - <access_specifications> - <access_specification name="Outtrading22"> - <destination identifier="TestDestination2" delivery_mode="persistent" - message_class="stream"/> - <uses_connection connection="wbe2" /> - <native_schema> - <attribute name="cntr" type="Long"/> - </native_schema> - </access_specification> - </access_specifications> - -The following example shows an access specification that contains a <destination> element +For Apache ActiveMQ, the <destination> element can directly refer to a queue or a topic. + +The following example shows an access specification that contains a <destination> element for the WebSphere MQ messaging system: + + <access_specifications> + <access_specification name="Outtrading22"> + <destination identifier="TestDestination2" delivery_mode="persistent" + message_class="stream"/> + <uses_connection connection="wbe2" /> + <native_schema> + <attribute name="cntr" type="Long"/> + </native_schema> + </access_specification> + </access_specifications> + +The following example shows an access specification that contains a <destination> element that directly refers to an ActiveMQ queue: - <access_specifications> - <access_specification name="amqAccess"> - <destination identifier="dynamicQueues/MapQueue" - delivery_mode="persistent" message_class="stream" /> - <uses_connection connection="amqConn"/> - <native_schema> - <attribute name="cntr" type="Long" /> - </native_schema> - </access_specification> - </access_specifications> - -The <destination> element has three attributes: + <access_specifications> + <access_specification name="amqAccess"> + <destination identifier="dynamicQueues/MapQueue" + delivery_mode="persistent" message_class="stream" /> + <uses_connection connection="amqConn"/> + <native_schema> + <attribute name="cntr" type="Long" /> + </native_schema> + </access_specification> + </access_specifications> + +The <destination> element has three attributes: * identifier * This attribute value is the name of the destination administered object that is used. In particular, it is an administered object within the directory service context that is @@ -384,58 +384,58 @@ The <destination> element has three attributes: If this attribute is not specified, the default value is non_persistent. * message_class * This attribute value specifies how a stream tuple is serialized into a WebSphere MQ or an Apache ActiveMQ message. - It operates with the <native_schema> element. The value can be: bytes, empty, map, stream, wbe, wbe22, or xml. + It operates with the <native_schema> element. The value can be: bytes, empty, map, stream, wbe, wbe22, or xml. ++++ Message class formats in the Messaging Toolkit The message class specifies the type of XMS or JMS message that is output or expected by the operators in the Messaging Toolkit. It also affects how the message payload is converted to tuples or vice versa. -The Messaging Toolkit supports the following values for the message_class attribute in the <destination> element: +The Messaging Toolkit supports the following values for the message_class attribute in the <destination> element: bytes, empty, map, stream, wbe, wbe22, and xml. When the message_class attribute value is `bytes`: * The `XMSSink` operator constructs an XMS BytesMessage. The `JMSSink` operator constructs a JMS BytesMessage. The BytesMessage is a stream of binary data that contains values that are taken from the input stream. - The values are serialized into the message in the order that is specified in the <native_schema> element + The values are serialized into the message in the order that is specified in the <native_schema> element and they use the data types from that element. * **Note**: The `XMSSink` operator can handle String and blob attributes that do not have lengths that are specified - in the <native_schema> element. The operator serializes such attributes directly into the BytesMessage, + in the <native_schema> element. The operator serializes such attributes directly into the BytesMessage, and uses the actual length of the attribute at run time. For example, if the blob has three entries, then it uses up 3 bytes in the BytesMessage. - If the <native_schema> element contains a length for a String or blob attribute, + If the <native_schema> element contains a length for a String or blob attribute, then the `XMSSink` operator pads or truncates the run time attribute value to make it fit this length. Blob attributes are padded with nulls and String attributes are padded with spaces. -* The `JMSSink` operator raises a runtime error if the <native_schema> element does not specify the length +* The `JMSSink` operator raises a runtime error if the <native_schema> element does not specify the length for all String and blob attributes or if a negative length other than -2 or -4 is specified. If the length is positive, the operator pads or truncates the run time attributes to make it fit this length. Blob attributes are padded with null values and String attributes are padded with spaces. For String attributes, length is measured in number of bytes. * The `XMSSource` operator expects an XMS BytesMessage. The `JMSSource` operator expects a JMS BytesMessage. - The BytesMessage contains the values of the elements that are listed in the <native_schema>. - The values of the elements must occur in the order that is specified in the <native_schema> element. + The BytesMessage contains the values of the elements that are listed in the <native_schema>. + The values of the elements must occur in the order that is specified in the <native_schema> element. The data types must match the types that are specified in the schema. If the data in the message is insufficient, the operator discards the entire message and logs a run time error. - * **Note**: The <native_schema> element must specify the length of all the String and blob attributes, - unless the attribute is the final attribute in the <native_schema> element. - * If the attribute is the final attribute in the <native_schema> element and a length is not specified, + * **Note**: The <native_schema> element must specify the length of all the String and blob attributes, + unless the attribute is the final attribute in the <native_schema> element. + * If the attribute is the final attribute in the <native_schema> element and a length is not specified, the operators assume that the attribute takes all the bytes that remain in the XMS message. - * For an `XMSSource` operator, if the attribute is not the final attribute in the <native_schema> element + * For an `XMSSource` operator, if the attribute is not the final attribute in the <native_schema> element and a length is not specified, a compile-time error occurs. - For a `JMSSource` operator, if the attribute is not the final attribute in the <native_schema> element + For a `JMSSource` operator, if the attribute is not the final attribute in the <native_schema> element and a length is not specified, a run-time error occurs. - * For an `XMSSource` operator, if the length of the <attribute> element is -2, -4 or -8, + * For an `XMSSource` operator, if the length of the <attribute> element is -2, -4 or -8, the operator assumes that the data that appears in the XMS message starts with a signed 2, 4, or 8-byte length field. This length field is an integer that is encoded in the same way as any other integer attribute in the message. If the length value is incomplete, negative, or absent from the message, or there are insufficient bytes remaining in the XMS message, the operator discards the entire message and logs a run-time error. - * For a `JMSSource` operator, if the length of the <attribute> element is -2 or -4, + * For a `JMSSource` operator, if the length of the <attribute> element is -2 or -4, the operator assume that the data that appears in the JMS message starts with a signed 2 or 4-byte length field. This length field is an integer that is encoded in the same way as any other integer attribute in the message. If the length value is incomplete, negative, or absent from the message, or there are insufficient bytes remaining in the JMS message, the operator discards the entire message and logs a run-time error. - * If the length of the <attribute> element is non-negative, then the operators attempt to read exactly + * If the length of the <attribute> element is non-negative, then the operators attempt to read exactly that number of bytes from the BytesMessage. If there are insufficient bytes remaining in the XMS or JMS message, the operators discard the entire message and logs a run time error. * For an `XMSSource` operator, if the length attribute value is not a supported value, a compile-time error occurs. @@ -445,38 +445,38 @@ When the message_class attribute value is `empty`: * The `XMSSink` operator construct an empty XMS message. The `JMSSink` operator construct an empty JMS message. For example, you can use an empty XMS message to verify that your InfoSphere Streams application can connect to the WebSphere MQ server and that the message reaches its destination. - **Note**: This message_class value cannot be used with a <native_schema> element. + **Note**: This message_class value cannot be used with a <native_schema> element. * The `XMSSource` operator expects an empty XMS message. It emits a tuple with attribute values that are assigned by the SPL program. - This behavior is not supported if a <native_schema> element is defined or if there are any unassigned attributes. + This behavior is not supported if a <native_schema> element is defined or if there are any unassigned attributes. The `JMSSource` operator expects an empty JMS message. When the message_class attribute value is `map`: * The `XMSSink` operator produces an XMS MapMessage. The `JMSSink` operator produces a JMS MapMessage. The MapMessage is a collection of name, type and value triplets. - The triplets contain elements whose names and types are taken from the <native_schema> element. + The triplets contain elements whose names and types are taken from the <native_schema> element. Their values are taken from the operator input stream. -* The `XMSSource` operator expects an XMS MapMessage that contains elements whose names and types appear in the <native_schema> element. - The `JMSSource` operator expects a JMS MapMessage that contains elements whose names and types appear in the <native_schema> element. +* The `XMSSource` operator expects an XMS MapMessage that contains elements whose names and types appear in the <native_schema> element. + The `JMSSource` operator expects a JMS MapMessage that contains elements whose names and types appear in the <native_schema> element. * If the schema contains an attribute which is not present in the MapMessage, the operator discards the entire message and logs a run time error. - * If the MapMessage contains an attribute whose type does not match the data type in the <native_schema>, + * If the MapMessage contains an attribute whose type does not match the data type in the <native_schema>, the operators attempt to convert it using the standard XMS or JMS conversion rules. If this conversion fails, the entire message is discarded and an error message is logged. * If the MapMessage contains an attribute with a null value, the corresponding output stream attribute is set to the default value for its type. - * If the MapMessage contains attributes that are not listed in the <native_schema>, the attributes are ignored. + * If the MapMessage contains attributes that are not listed in the <native_schema>, the attributes are ignored. When the message_class attribute value is `stream`: * The `XMSSink` operator constructs an XMS StreamMessage. The `JMSSink` operator constructs a JMS StreamMessage. The StreamMessage is an ordered list of type and value pairs. The pairs contain values that are taken from the input stream. - The values are serialized into the message in the order in which they are specified in the <native_schema> element. - The values use the data types from the <native_schema> element. + The values are serialized into the message in the order in which they are specified in the <native_schema> element. + The values use the data types from the <native_schema> element. * The `XMSSource` operator expects an XMS StreamMessage. The `JMSSource` operator expects a JMS StreamMessage. - The StreamMessage contains the values of the elements that are listed in the <native_schema> element. - The values must occur in the order that is specified in the <native_schema> element. + The StreamMessage contains the values of the elements that are listed in the <native_schema> element. + The values must occur in the order that is specified in the <native_schema> element. The values must also have the data types that are specified in that schema. * If the operator encounters an attribute in the StreamMessage that does not have the expected data type, - it attempts to convert the attribute to the data type specified by the <native_schema> element. + it attempts to convert the attribute to the data type specified by the <native_schema> element. If they conversion fails, the operator discards the entire message and logs a run time error. - * If the StreamMessage contains more attributes than the <native_schema>, the attributes are ignored and a tuple is emitted. + * If the StreamMessage contains more attributes than the <native_schema>, the attributes are ignored and a tuple is emitted. * If the StreamMessage does not contain enough attributes, the operator discards the entire message and logs a run-time error. * If the StreamMessage contains an attribute with a null value, the corresponding output stream attribute is set to the default value for its type. @@ -485,10 +485,10 @@ When the message_class attribute value is `wbe`: * The `XMSSink` operator constructs an XMS TextMessage. The `JMSSink` operator constructs a JMS TextMessage. The TextMessage contains an XML document in the WebSphere Business Events (WBE) event packet format. - The XML document contains a single <connector-bundle> of type Event. + The XML document contains a single <connector-bundle> of type Event. The XML namespace is `http://wbe.ibm.com/6.2/Event/xxx` where *xxx* is the input stream name. The input port is used as the connector-bundle name attribute and also as a wrapper element that contains the stream attribute values. - The XML document contains a field for each attribute that is listed in the <native_schema> element. + The XML document contains a field for each attribute that is listed in the <native_schema> element. **Restriction**: The blob data type is not a supported when you use the wbe message class. * The `XMSSource` and the `JMSSource` operators do not support the use of the wbe message_class attribute value. @@ -498,9 +498,9 @@ When the message_class attribute value is `wbe22`: * The `XMSSink` operator constructs an XMS TextMessage. The `JMSSink` operator constructs a JMS TextMessage. The TextMessage contains an XML document in the WebSphere Business Events 2.2 event packet format. - The XML document contains a single "connector-object" (event). + The XML document contains a single "connector-object" (event). The input stream name is used for both the WBE event-name and event-object name. - The document contains a field for each attribute that is listed in the <native_schema> element. + The document contains a field for each attribute that is listed in the <native_schema> element. **Restriction**: The blob data type is not supported when you use the wbe22 message class. * The `XMSSource` and the `JMSSource` operators do not support the use of the wbe22 message_class attribute value. @@ -520,79 +520,79 @@ and a message_class value of wbe: <connector xmlns='http://wbe.ibm.com/6.2/Event/StockCashMergerArbitrageStatus_T_SBC' - name='System S' version='6.2'> + name='System S' version='6.2'> <connector-bundle name='StockCashMergerArbitrageStatus_T_SBC' - type='Event'> - <StockCashMergerArbitrageStatus_T_SBC'> - <date data-type='string'>01-FEB-2005</date> - <time data-type='string'>14:45:21.335</time> - <ratio data-type='real'>0.747079</ratio> - <alertType data-type='string'>IN</alertType> - <dayNum data-type='integer'>12815</dayNum> - <flag data-type='boolean'>true</flag> - </StockCashMergerArbitrageStatus_T_SBC'> - </connector-bundle> - </connector> + type='Event'> + <StockCashMergerArbitrageStatus_T_SBC'> + <date data-type='string'>01-FEB-2005</date> + <time data-type='string'>14:45:21.335</time> + <ratio data-type='real'>0.747079</ratio> + <alertType data-type='string'>IN</alertType> + <dayNum data-type='integer'>12815</dayNum> + <flag data-type='boolean'>true</flag> + </StockCashMergerArbitrageStatus_T_SBC'> + </connector-bundle> + </connector> The following is an example of the XML document that is generated for an input stream named iport$0 and a message_class value of wbe22: - <connector name='System S' version='2.2'> + <connector name='System S' version='2.2'> <connector-bundle name='iport$0' - type='Event'> - <connector-object name='iport$0'> - <field name='uintdata'>100</field> - <field name='somethingID'>Yes</field> - <field name='somethingid'>YesToo</field> - </connector-object> - </connector-bundle> - </connector> + type='Event'> + <connector-object name='iport$0'> + <field name='uintdata'>100</field> + <field name='somethingID'>Yes</field> + <field name='somethingid'>YesToo</field> + </connector-object> + </connector-bundle> + </connector> +++ Uses_connection element -A <uses_connection> element identifies a connection specification that can be used with the access specification. +A <uses_connection> element identifies a connection specification that can be used with the access specification. -Here is an example of an abridged access specification that contains <uses_connection> elements: +Here is an example of an abridged access specification that contains <uses_connection> elements: - <access_specification name="PersonSink"> + <access_specification name="PersonSink"> ... - <uses_connection connection="testSysten" /> - <uses_connection connection="productionSystem" /> + <uses_connection connection="testSysten" /> + <uses_connection connection="productionSystem" /> ... - </access_specification> + </access_specification> -The <uses_connection> element has a single attribute, connection, whose value is the name -of a <connection_specification> element in the same connection specifications document. -An <access_specification> element must have at least one <uses_connection> element. +The <uses_connection> element has a single attribute, connection, whose value is the name +of a <connection_specification> element in the same connection specifications document. +An <access_specification> element must have at least one <uses_connection> element. +++ Native_schema element -In the Messaging Toolkit, the <native_schema> element specifies the data attributes of the data +In the Messaging Toolkit, the <native_schema> element specifies the data attributes of the data that is received from or sent to WebSphere MQ or Apache ActiveMQ. -For example, the <native_schema> element identifies the attribute names, data types, and optionally their length. +For example, the <native_schema> element identifies the attribute names, data types, and optionally their length. -The following example is an abridged access specification that contains a <native_schema> element: - <access_specification name="OutTrading"> - <destination identifier="TestDestination" - delivery_mode="persistent" message_class="map" /> +The following example is an abridged access specification that contains a <native_schema> element: + <access_specification name="OutTrading"> + <destination identifier="TestDestination" + delivery_mode="persistent" message_class="map" /> ... - <native_schema> - <attribute name="id" type="Int" /> - <attribute name="fname" type="String" /> - <attribute name="lname" type="String" /> - </native_schema> - </access_specification> + <native_schema> + <attribute name="id" type="Int" /> + <attribute name="fname" type="String" /> + <attribute name="lname" type="String" /> + </native_schema> + </access_specification> -The <native_schema> element has no attributes; it has <attribute> elements. +The <native_schema> element has no attributes; it has <attribute> elements. ++++ Attribute element -The <attribute> element specifies information about the attributes that are present in an output XMS or JMS message, +The <attribute> element specifies information about the attributes that are present in an output XMS or JMS message, or are expected in an input XMS or JMS message. -The <attribute> element has three possible attributes: +The <attribute> element has three possible attributes: * name * This mandatory attribute specifies the name by which the attribute is known in the message. The Messaging Toolkit operators use these identifiers exactly as specified to access data in the message. @@ -680,7 +680,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 2.0.1 + 2.1.0 4.0.0.0