Skip to content

Commit

Permalink
issue #44: JMSSource with Empty message class should provide
Browse files Browse the repository at this point in the history
information about the message received from server
  • Loading branch information
chanskw committed Jul 15, 2014
1 parent 8bae8e4 commit 6c3a486
Showing 1 changed file with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,20 @@

import java.util.List;

import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import com.ibm.rmi.corba.ObjectManager;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.Type.MetaType;

//Empty message class is used in JMSSink/JMSSource to send/receive
// control message to the JMSPorvider to test initial connectivity.
Expand All @@ -24,21 +32,75 @@ class EmptyMessageHandler extends JMSMessageHandlerImpl {
}

// Used by JMSSink to convert an incoming tuple to JMS MEssage
public Message convertTupleToMessage(Tuple tuple, Session session)
throws JMSException {
public Message convertTupleToMessage(Tuple tuple, Session session) throws JMSException {

synchronized (session) {
// simply create a new JMSMEssage and return
return session.createMessage();
}

}

// Used by JMSSource to convert an incoming JMS Message to a tuple
public MessageAction convertMessageToTuple(Message message,
OutputTuple tuple) {
// No validations are performed regarding the type of message
// No values are assigned to tuple elements.
public MessageAction convertMessageToTuple(Message message, OutputTuple tuple) {

MetaType attrType = tuple.getStreamSchema().getAttribute(0).getType().getMetaType();

// if the first attribute is a RString, encode message information into
// the rstring
if (attrType == MetaType.RSTRING) {
try {
String msgId = message.getJMSMessageID();
long expiration = message.getJMSExpiration();
String type = getMessageType(message);
String deliveryModeStr = getDeliveryMode(message);

// format is "messagetype, msgid, deliveryMode, expiration"
StringBuilder builder = new StringBuilder();
builder.append(type);
builder.append(",");
builder.append(msgId);
builder.append(",");
builder.append(deliveryModeStr);
builder.append(",");
builder.append(expiration);

tuple.setString(0, builder.toString());
} catch (JMSException e) {
}
}

return MessageAction.SUCCESSFUL_MESSAGE;
}

private String getMessageType(Message message) {
if (message instanceof BytesMessage)
return "bytes";
else if (message instanceof StreamMessage)
return "streams";
else if (message instanceof MapMessage)
return "map";
else if (message instanceof TextMessage)
return "text";
else if (message instanceof ObjectMessage)
return "object";
else if (message instanceof Message)
return "empty";
return "unknown";
}

private String getDeliveryMode(Message message) {
String deliveryModeStr = "unknown";
int deliveryMode = -1;
try {
deliveryMode = message.getJMSDeliveryMode();
} catch (JMSException e) {
}

if (deliveryMode == DeliveryMode.NON_PERSISTENT) {
deliveryModeStr = "non_persistent";
} else if (deliveryMode == DeliveryMode.PERSISTENT) {
deliveryModeStr = "persistent";
}
return deliveryModeStr;
}
}

0 comments on commit 6c3a486

Please sign in to comment.