From d8ede4adaa5d0888c9e895511977592c0858bb22 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Fri, 23 Oct 2015 16:13:52 -0400 Subject: [PATCH 01/12] Updating copyrights --- .../RabbitMQSink/RabbitMQSink.xml | 2 +- .../RabbitMQSource/RabbitMQSource.xml | 2 +- .../com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java | 4 ++++ .../com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java | 4 ++++ .../com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java | 4 ++++ .../src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java | 5 +++++ 6 files changed, 19 insertions(+), 2 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 11ca587..716be0f 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index e51c30d..5826faa 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index dbaf3a8..2e7c8bc 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -1,3 +1,7 @@ +/******************************************************************************* +* Copyright (C) 2015, MOHAMED-ALI SAID +* All Rights Reserved +*******************************************************************************/ /* Generated by Streams Studio: March 26, 2014 11:37:11 AM EDT */ package com.ibm.streamsx.messaging.rabbitmq; diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 104342e..d16fe2c 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -1,4 +1,8 @@ /* Generated by Streams Studio: March 26, 2014 2:09:26 PM EDT */ +/******************************************************************************* +* Copyright (C) 2015, MOHAMED-ALI SAID +* All Rights Reserved +*******************************************************************************/ package com.ibm.streamsx.messaging.rabbitmq; diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java index 495d754..4f73ff0 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java @@ -1,3 +1,7 @@ +/******************************************************************************* +* Copyright (C) 2015, MOHAMED-ALI SAID +* All Rights Reserved +*******************************************************************************/ package com.ibm.streamsx.messaging.rabbitmq; import java.io.File; diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java index 91a7fd4..45431bd 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java @@ -1,3 +1,8 @@ +/******************************************************************************* +* Copyright (C) 2015, MOHAMED-ALI SAID +* All Rights Reserved +*******************************************************************************/ + package com.ibm.streamsx.messaging.rabbitmq; From 2924e895245cd4c81761abbcfd46cb9453e85ec0 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Tue, 27 Oct 2015 17:04:55 -0400 Subject: [PATCH 02/12] Refactored out some redundant code and implemented message/routingKey Now you can specify which attributes to assign message and routing key to (issue #140) Began refactoring (issue #141) Can now specify exchange type (issue #142) --- .../RabbitMQSink/RabbitMQSink.xml | 33 +++- .../RabbitMQSource/RabbitMQSource.xml | 33 +++- .../messaging/rabbitmq/AttributeHelper.java | 111 +++++++++++ .../messaging/rabbitmq/RabbitBaseOper.java | 127 ++++++++++++ .../messaging/rabbitmq/RabbitMQSink.java | 52 +---- .../messaging/rabbitmq/RabbitMQSource.java | 75 ++++--- .../messaging/rabbitmq/RabbitMQWrapper.java | 186 +++++++++--------- 7 files changed, 433 insertions(+), 184 deletions(-) create mode 100644 com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/AttributeHelper.java create mode 100644 com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 716be0f..ebe1bfd 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -28,25 +28,39 @@ hostname - Exchange Name. + Name of the attribute for the message. This attribute is required. Default is "message". + false + rstring + 1 + + + messageAttribute + Name of the attribute for the message. This attribute is required. Default is "message". true rstring 1 password - Exchange Name. - true + Name of the attribute for the key. Default is "key". + false rstring 1 portId - Exchange Name. + Port id. Default 5672. true int32 1 + + queueName + Name of the attribute for the key. Default is "key". + true + rstring + 1 + routingKey Exchange Name. @@ -55,12 +69,19 @@ 1 - username + routingKeyAttribute Exchange Name. true rstring 1 + + username + Name of the attribute for the key. Default is "key". + false + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 5826faa..69ac69d 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -28,25 +28,39 @@ hostname - Exchange Name. + Name of the attribute for the message. This attribute is required. Default is "message". + false + rstring + 1 + + + messageAttribute + Name of the attribute for the message. This attribute is required. Default is "message". true rstring 1 password - Exchange Name. - true + Name of the attribute for the key. Default is "key". + false rstring 1 portId - Exchange Name. + Port id. Default 5672. true int32 1 + + queueName + Name of the attribute for the key. Default is "key". + true + rstring + 1 + routingKey Exchange Name. @@ -55,12 +69,19 @@ 1 - username + routingKeyAttribute Exchange Name. true rstring 1 + + username + Name of the attribute for the key. Default is "key". + false + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/AttributeHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/AttributeHelper.java new file mode 100644 index 0000000..74a6d42 --- /dev/null +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/AttributeHelper.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (C) 2015, International Business Machines Corporation + * All Rights Reserved + *******************************************************************************/ +package com.ibm.streamsx.messaging.rabbitmq; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Set; + +import com.ibm.streams.operator.Attribute; +import com.ibm.streams.operator.OutputTuple; +import com.ibm.streams.operator.StreamSchema; +import com.ibm.streams.operator.Tuple; +import com.ibm.streams.operator.Type.MetaType; +import com.ibm.streams.operator.types.Blob; +import com.ibm.streams.operator.types.ValueFactory; + +//Helper to check if attributes have been specified explicitly +class AttributeHelper { + static final Charset CS = Charset.forName("UTF-8"); + private boolean wasSet = false, isAvailable = false; + private MetaType mType = null; + private String name = null; + private boolean isString = false; + + AttributeHelper(String n) { + this.name = n; + } + + boolean isWasSet() { + return wasSet; + } + + boolean isAvailable() { + return isAvailable; + } + + String getName() { + return name; + } + + void setName(String name) { + this.name = name; + wasSet = true; + } + + void initialize(StreamSchema ss, boolean required, Set supportedTypes) throws Exception { + Attribute a = ss.getAttribute(name); + if(a == null) { + if(wasSet) + throw new IllegalArgumentException("Attribute \"" + name + "\" not available."); + if(required) + throw new IllegalArgumentException("Attribute not found for \"" + name + "\"."); + return; + } + this.mType = a.getType().getMetaType(); + isString = mType == MetaType.RSTRING || mType == MetaType.USTRING; + + if(!supportedTypes.contains(mType)){ + throw new Exception("Attribute \"" + name + "\" must be one of: " + supportedTypes); + } + isAvailable = true; + } + boolean isString() { + return isString; + } + + void setValue(OutputTuple otup, String value) { + if(!isAvailable) return; + if(isString) { + if (value == null) + value = ""; + otup.setString(name, value); + } else + otup.setBlob(name, ValueFactory.newBlob(value.getBytes(CS))); + } + void setValue(OutputTuple otup, byte[] value) { + if(!isAvailable) return; + if(isString) { + if (value == null) + otup.setString(name,""); + else + otup.setString(name, new String(value, CS)); + } + else + otup.setBlob(name, ValueFactory.newBlob(value)); + } + + String getString(Tuple tuple) throws IOException { + if(!isAvailable) return null; + if(isString) + return tuple.getString(name); + return new String(getBytesFromBlob(tuple, name)); + } + byte[] getBytes(Tuple tuple) throws IOException { + if(!isAvailable) return null; + if(isString) + return tuple.getString(name).getBytes(CS); + return getBytesFromBlob(tuple, name); + } + private static byte[] getBytesFromBlob(Tuple tuple, String name) throws IOException { + Blob blockMsg = tuple.getBlob(name); + InputStream inputStream = blockMsg.getInputStream(); + int length = (int) blockMsg.getLength(); + byte[] byteArray = new byte[length]; + inputStream.read(byteArray, 0, length); + return byteArray; + } +} diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java new file mode 100644 index 0000000..81a4394 --- /dev/null +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -0,0 +1,127 @@ +package com.ibm.streamsx.messaging.rabbitmq; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import java.util.logging.Logger; +import com.ibm.streams.operator.Type.MetaType; +import com.ibm.streams.operator.logging.TraceLevel; + +import com.ibm.streams.operator.AbstractOperator; +import com.ibm.streams.operator.OperatorContext; +import com.ibm.streams.operator.StreamSchema; +import com.ibm.streams.operator.model.Libraries; +import com.ibm.streams.operator.model.Parameter; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +@Libraries({ "opt/downloaded/*" }) +public class RabbitBaseOper extends AbstractOperator { + +// protected Channel channel; +// protected Connection connection; + protected String hostName = "localhost", + username = "guest", + password = "guest", + exchangeName = "logs", + exchangeType = "direct", + routingKey = "", + queueName = ""; + protected int portId = 5672; + + protected AttributeHelper topicAH = new AttributeHelper("topic"), + routingKeyAH = new AttributeHelper("routing_key"), + messageAH = new AttributeHelper("message"); + + private final Logger trace = Logger.getLogger(RabbitBaseOper.class + .getCanonicalName()); + + +// public synchronized void initialize(OperatorContext context) +// throws Exception { +// // Must call super.initialize(context) to correctly setup an operator. +// super.initialize(context); +// Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); +// ConnectionFactory factory = new ConnectionFactory(); +// factory.setHost(hostName); +// factory.setUsername(username); +// factory.setPassword(password); +// connection = factory.newConnection(); +// channel = connection.createChannel(); +// channel.exchangeDeclare(exchangeName, "fanout"); +// +// //channel.queueDeclare(queueName, false, false, false, null); +// } + +// public void shutdown() throws IOException, TimeoutException{ +// channel.close(); +// connection.close(); +// } + + public void initSchema(StreamSchema ss) throws Exception { + Set supportedTypes = new HashSet(); + supportedTypes.add(MetaType.RSTRING); + supportedTypes.add(MetaType.USTRING); + supportedTypes.add(MetaType.BLOB); + + routingKeyAH.initialize(ss, false, supportedTypes); + messageAH.initialize(ss, true, supportedTypes); + + } + + @Parameter(optional = false, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") + public void setHostname(String value) { + hostName = value; + } + + @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") + public void setUsername(String value) { + username = value; + } + + @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") + public void setPassword(String value) { + password = value; + } + + @Parameter(optional=true, description="Exchange Name.") + public void setExchangeName(String value) { + exchangeName = value; + } + + @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".") + public void setQueueName(String value) { + queueName = value; + } + + + @Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") + public void setMessageAttribute(String value) { + messageAH.setName(value); + } + +// @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".") +// public void setKeyAttribute(String value) { +// keyAH.setName(value); +// } + + @Parameter(optional=true, description="Port id. Default 5672.") + public void setPortId(int value) { + portId = value; + } + + @Parameter(optional=true, description="Exchange Name.") + public void setRoutingKeyAttribute(String value) { + routingKeyAH.setName(value); + } + + @Parameter(optional=true, description="Exchange Name.") + public void setRoutingKey(String value) { + routingKey = value; + } + +} + diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index 2e7c8bc..8580198 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -43,18 +43,12 @@ *

With the exception of operator initialization, all the other events may occur concurrently with each other, * which lead to these methods being called concurrently by different threads.

*/ -@Libraries({ "opt/downloaded/*" }) @InputPorts(@InputPortSet(cardinality=1, optional=false, description="")) @PrimitiveOperator(name="RabbitMQSink", description="something") -public class RabbitMQSink extends AbstractOperator { +public class RabbitMQSink extends RabbitBaseOper { private RabbitMQWrapper rabbitMQWrapper; - private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; - private int portIdParam; - - // joins to the event thread of the RFAConsumer to hold the operator open - private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQSink.class); @@ -70,6 +64,7 @@ public synchronized void initialize(OperatorContext context) // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); + super.initSchema(getInput(0).getStreamSchema()); log.trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); // TODO: @@ -78,8 +73,8 @@ public synchronized void initialize(OperatorContext context) // or external configuration files or a combination of the two. - rabbitMQWrapper = new RabbitMQWrapper(exchangeNameParam); - rabbitMQWrapper.login(userNameParam, passwordParam, hostNameParam, portIdParam); + rabbitMQWrapper = new RabbitMQWrapper(exchangeName, exchangeType); + rabbitMQWrapper.login(username, password, hostName, portId); // TODO: // If needed, insert code to establish connections or resources to communicate an external system or data store. // The configuration information for this may come from parameters supplied to the operator invocation, @@ -114,8 +109,8 @@ public void process(StreamingInput stream, Tuple tuple) // TODO Insert code here to process the incoming tuple, // typically sending tuple data to an external system or data store. - String message = tuple.getString("message"); - String guid = tuple.getString("guid_request"); + String message = tuple.getString(messageAH.getName()); + String guid = tuple.getString(routingKeyAH.getName()); rabbitMQWrapper.publish(guid,message); @@ -134,36 +129,6 @@ public void processPunctuation(StreamingInput stream, // TODO: If window punctuations are meaningful to the external system or data store, // insert code here to process the incoming punctuation. } - - @Parameter(optional=true, description="Exchange Name.") - public void setExchangeName(String value) { - exchangeNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setRoutingKey(String value) { - routingKeyParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setUsername(String value) { - userNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setPassword(String value) { - passwordParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setHostname(String value) { - hostNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setPortId(int value) { - portIdParam = value; - } /** * Shutdown this operator. @@ -171,11 +136,8 @@ public void setPortId(int value) { */ @Override public synchronized void shutdown() throws Exception { - OperatorContext context = getOperatorContext(); - + OperatorContext context = getOperatorContext(); rabbitMQWrapper.logout(); // should force the join() to exit - - // TODO: If needed, close connections or release resources related to any external system or data store. super.shutdown(); } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index d16fe2c..5e7c51f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -41,11 +41,10 @@ *

With the exception of operator initialization, all the other events may occur concurrently with each other, * which lead to these methods being called concurrently by different threads.

*/ -@Libraries({ "opt/downloaded/*" }) @OutputPorts(@OutputPortSet(cardinality=1, optional=false, description="Messages received from Kafka are sent on this output port.")) @PrimitiveOperator(name="RabbitMQSource", description="something") -public class RabbitMQSource extends AbstractOperator implements UpdateEvent { +public class RabbitMQSource extends RabbitBaseOper implements UpdateEvent { public static final String EXCHANGENAME_PARAM = "exchangeName"; public static final String ROUTINGKEY_PARAM = "routingKey"; @@ -55,8 +54,8 @@ public class RabbitMQSource extends AbstractOperator implements UpdateEvent { public static final String PORTID_PARAM="portId"; private RabbitMQWrapper rabbitMQWrapper; - private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; - private int portIdParam; + //private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; + //private int portIdParam; private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQSource.class); /** @@ -79,8 +78,8 @@ public synchronized void initialize(OperatorContext context) // If needed, insert code to establish connections or resources to communicate an external system or data store. // The configuration information for this may come from parameters supplied to the operator invocation, // or external configuration files or a combination of the two. - rabbitMQWrapper = new RabbitMQWrapper(this,exchangeNameParam,routingKeyParam); - rabbitMQWrapper.login(userNameParam, passwordParam, hostNameParam, portIdParam); + rabbitMQWrapper = new RabbitMQWrapper(this,exchangeName,routingKey); + rabbitMQWrapper.login(username, password, hostName, portId); /* * Create the thread for producing tuples. @@ -144,9 +143,9 @@ public void NotifyUpdateEvent (String guid, String message){ // Set attributes in tuple: // tuple.setString("AttributeName", "AttributeValue"); - - tuple.setString("guid_request",guid); - tuple.setString("message",message); + if (!guid.isEmpty()) + tuple.setString(routingKeyAH.getName(),guid); + tuple.setString(messageAH.getName(),message); // Submit tuple to output stream out.submit(tuple); @@ -156,35 +155,35 @@ public void NotifyUpdateEvent (String guid, String message){ } } - @Parameter(optional=true, description="Exchange Name.") - public void setExchangeName(String value) { - exchangeNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setRoutingKey(String value) { - routingKeyParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setUsername(String value) { - userNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setPassword(String value) { - passwordParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setHostname(String value) { - hostNameParam = value; - } - - @Parameter(optional=true, description="Exchange Name.") - public void setPortId(int value) { - portIdParam = value; - } +// @Parameter(optional=true, description="Exchange Name.") +// public void setExchangeName(String value) { +// exchangeNameParam = value; +// } +// +// @Parameter(optional=true, description="Exchange Name.") +// public void setRoutingKey(String value) { +// routingKeyParam = value; +// } +// +// @Parameter(optional=true, description="Exchange Name.") +// public void setUsername(String value) { +// userNameParam = value; +// } +// +// @Parameter(optional=true, description="Exchange Name.") +// public void setPassword(String value) { +// passwordParam = value; +// } +// +// @Parameter(optional=true, description="Exchange Name.") +// public void setHostname(String value) { +// hostNameParam = value; +// } +// +// @Parameter(optional=true, description="Exchange Name.") +// public void setPortId(int value) { +// portIdParam = value; +// } /** * Shutdown this operator, which will interrupt the thread diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java index 4f73ff0..7f7478d 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java @@ -1,7 +1,7 @@ /******************************************************************************* -* Copyright (C) 2015, MOHAMED-ALI SAID -* All Rights Reserved -*******************************************************************************/ + * Copyright (C) 2015, MOHAMED-ALI SAID + * All Rights Reserved + *******************************************************************************/ package com.ibm.streamsx.messaging.rabbitmq; import java.io.File; @@ -13,104 +13,112 @@ import org.apache.log4j.Logger; import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class RabbitMQWrapper { + + private static final org.slf4j.Logger log = LoggerFactory + .getLogger(RabbitMQWrapper.class); + private final String ExchangeName; + private final String RoutingKey; + private final String exchangeType; + + private UpdateEvent updateEvent; + private ConnectionFactory connectionFactory; + private com.rabbitmq.client.Connection connection; + private Channel channel; + + public void login(String userName, String password, String hostName, + int port) throws IOException, TimeoutException { + connectionFactory = new ConnectionFactory(); + connectionFactory.setUsername(userName); + connectionFactory.setPassword(password); + connectionFactory.setHost(hostName); + connectionFactory.setPort(port); + connection = connectionFactory.newConnection(); + channel = connection.createChannel(); + channel.exchangeDeclare(ExchangeName, exchangeType); + } + + public RabbitMQWrapper(final String exchangeName, final String exchangeT) { + ExchangeName = exchangeName; + RoutingKey = ""; + exchangeType = exchangeT; + } - private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQWrapper.class); - private final String ExchangeName; - private final String RoutingKey; - - private UpdateEvent updateEvent; - private ConnectionFactory connectionFactory; - private com.rabbitmq.client.Connection connection; - public void login(String userName, String password, String hostName, int port) - { - connectionFactory = new ConnectionFactory(); - connectionFactory.setUsername(userName); - connectionFactory.setPassword(password); - connectionFactory.setHost(hostName); - connectionFactory.setPort(port); - try { - connection = connectionFactory.newConnection(); + + + public RabbitMQWrapper(UpdateEvent event, final String exchangeName, + final String routingKey) { + updateEvent = event; + ExchangeName = exchangeName; + RoutingKey = routingKey; + exchangeType = "direct"; + } + + public void logout() { + try { + connection.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void publish(String routingKey, String message) { + + try { + Channel channel = connection.createChannel(); + + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, ExchangeName, routingKey); + log.trace("Producing message: " + message + " in thread: " + + Thread.currentThread().getName()); - } catch (IOException e) { - e.printStackTrace(); - } catch (TimeoutException e) { + channel.basicPublish(ExchangeName, routingKey, null, + message.getBytes()); + + channel.close(); + + } catch (IOException e) { + log.trace("Exception message:" + e.getMessage() + "\r\n"); + } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } - public RabbitMQWrapper(final String exchangeName) { - ExchangeName=exchangeName; - RoutingKey=""; - } - - public RabbitMQWrapper(UpdateEvent event,final String exchangeName,final String routingKey) - { - updateEvent = event; - ExchangeName=exchangeName; - RoutingKey=routingKey; - } - public void logout() { - try { - connection.close(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void publish(String guid,String message) { - - try { - Channel channel = connection.createChannel(); - - channel.exchangeDeclare(ExchangeName, "direct"); - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, ExchangeName, guid); - log.trace("Producing message: " + message + " in thread: " + Thread.currentThread().getName()); - channel.basicPublish(ExchangeName, guid, null, message.getBytes()); - - channel.close(); - - } catch (IOException e) { - log.trace("Exception message:"+e.getMessage() + "\r\n"); - } catch (TimeoutException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + + public void Consume() { + + try { + boolean NO_ACK = false; + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, ExchangeName, RoutingKey); + QueueingConsumer consumer = new QueueingConsumer(channel); + channel.basicConsume(queueName, NO_ACK, consumer); + while (true) { // you might want to implement some loop-finishing + // logic here ;) + QueueingConsumer.Delivery delivery; + try { + delivery = consumer.nextDelivery(); + String Message = new String(delivery.getBody()); + log.trace("received message: " + Message + " in thread: " + + Thread.currentThread().getName()); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), + false); + updateEvent.NotifyUpdateEvent(RoutingKey, Message); + } catch (InterruptedException ie) { + continue; + } + } + } catch (IOException e) { + e.printStackTrace(); + } } - } - - public void Consume() { - - try { - boolean NO_ACK=false; - Channel channel = connection.createChannel(); - channel.exchangeDeclare(ExchangeName, "direct"); - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, ExchangeName, RoutingKey); - QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, NO_ACK, consumer); - while (true) { // you might want to implement some loop-finishing - // logic here ;) - QueueingConsumer.Delivery delivery; - try { - delivery = consumer.nextDelivery(); - String Message = new String(delivery.getBody()); - log.trace("received message: " + Message + " in thread: " + Thread.currentThread().getName()); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); - updateEvent.NotifyUpdateEvent(RoutingKey,Message); - } catch (InterruptedException ie) { - continue; - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } } From 86e5c436d6e35324bbef5fe59e35dcf8eddd9e3d Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Tue, 27 Oct 2015 19:09:47 -0400 Subject: [PATCH 03/12] Implemented non-deprecated API for Kafka Source I'm moving away from the RabbitMQWrapper, in favor of a produceTuples() method that has the Source logic for the DefaultConsumer API. --- .../RabbitMQSink/RabbitMQSink.xml | 2 +- .../RabbitMQSource/RabbitMQSource.xml | 2 +- .../messaging/rabbitmq/RabbitBaseOper.java | 42 ++++++------ .../messaging/rabbitmq/RabbitMQSink.java | 7 +- .../messaging/rabbitmq/RabbitMQSource.java | 67 +++++++++++++++---- .../messaging/rabbitmq/RabbitMQWrapper.java | 16 ++--- 6 files changed, 92 insertions(+), 44 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index ebe1bfd..77e2b82 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 69ac69d..88275a8 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 81a4394..332a666 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -39,27 +39,29 @@ public class RabbitBaseOper extends AbstractOperator { private final Logger trace = Logger.getLogger(RabbitBaseOper.class .getCanonicalName()); + private ConnectionFactory connectionFactory; + private com.rabbitmq.client.Connection connection; + protected Channel channel; -// public synchronized void initialize(OperatorContext context) -// throws Exception { -// // Must call super.initialize(context) to correctly setup an operator. -// super.initialize(context); -// Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); -// ConnectionFactory factory = new ConnectionFactory(); -// factory.setHost(hostName); -// factory.setUsername(username); -// factory.setPassword(password); -// connection = factory.newConnection(); -// channel = connection.createChannel(); -// channel.exchangeDeclare(exchangeName, "fanout"); -// -// //channel.queueDeclare(queueName, false, false, false, null); -// } + public synchronized void initialize(OperatorContext context) + throws Exception { + // Must call super.initialize(context) to correctly setup an operator. + super.initialize(context); + connectionFactory = new ConnectionFactory(); + connectionFactory.setUsername(username); + connectionFactory.setPassword(password); + connectionFactory.setHost(hostName); + connectionFactory.setPort(portId); + connection = connectionFactory.newConnection(); + channel = connection.createChannel(); + channel.exchangeDeclare(exchangeName, exchangeType); + //channel.queueDeclare(queueName, false, false, false, null); + } -// public void shutdown() throws IOException, TimeoutException{ -// channel.close(); -// connection.close(); -// } + public void shutdown() throws IOException, TimeoutException{ + channel.close(); + connection.close(); + } public void initSchema(StreamSchema ss) throws Exception { Set supportedTypes = new HashSet(); @@ -67,7 +69,7 @@ public void initSchema(StreamSchema ss) throws Exception { supportedTypes.add(MetaType.USTRING); supportedTypes.add(MetaType.BLOB); - routingKeyAH.initialize(ss, false, supportedTypes); + routingKeyAH.initialize(ss, true, supportedTypes); messageAH.initialize(ss, true, supportedTypes); } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index 8580198..4556f44 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -5,8 +5,10 @@ /* Generated by Streams Studio: March 26, 2014 11:37:11 AM EDT */ package com.ibm.streamsx.messaging.rabbitmq; +import java.io.IOException; import java.lang.Thread.State; import java.util.List; +import java.util.concurrent.TimeoutException; import com.ibm.streams.operator.AbstractOperator; import com.ibm.streams.operator.OperatorContext; @@ -132,10 +134,11 @@ public void processPunctuation(StreamingInput stream, /** * Shutdown this operator. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. + * @throws TimeoutException + * @throws IOException */ @Override - public synchronized void shutdown() throws Exception { + public synchronized void shutdown() throws IOException, TimeoutException { OperatorContext context = getOperatorContext(); rabbitMQWrapper.logout(); // should force the join() to exit super.shutdown(); diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 5e7c51f..69ec18c 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -8,7 +8,9 @@ //import org.apache.log4j.Logger; +import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeoutException; import com.ibm.streams.operator.AbstractOperator; import com.ibm.streams.operator.OperatorContext; @@ -21,7 +23,11 @@ import com.ibm.streams.operator.model.PrimitiveOperator; import com.ibm.streamsx.messaging.rabbitmq.UpdateEvent; -import com.ibm.streamsx.messaging.rabbitmq.RabbitMQWrapper; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + import org.slf4j.LoggerFactory; /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali @@ -53,7 +59,7 @@ public class RabbitMQSource extends RabbitBaseOper implements UpdateEvent { public static final String HOSTNAME_PARAM="hostName"; public static final String PORTID_PARAM="portId"; - private RabbitMQWrapper rabbitMQWrapper; + //private RabbitMQWrapper rabbitMQWrapper; //private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; //private int portIdParam; @@ -73,14 +79,23 @@ public synchronized void initialize(OperatorContext context) throws Exception { // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); + super.initSchema(getOutput(0).getStreamSchema()); log.trace(this.getClass().getName()+"Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); // TODO: // If needed, insert code to establish connections or resources to communicate an external system or data store. // The configuration information for this may come from parameters supplied to the operator invocation, - // or external configuration files or a combination of the two. - rabbitMQWrapper = new RabbitMQWrapper(this,exchangeName,routingKey); - rabbitMQWrapper.login(username, password, hostName, portId); - +// // or external configuration files or a combination of the two. +// rabbitMQWrapper = new RabbitMQWrapper(this,exchangeName,routingKey); +// rabbitMQWrapper.login(username, password, hostName, portId); + if (queueName == ""){ + queueName = channel.queueDeclare().getQueue(); + } + + channel.queueBind(queueName, exchangeName, routingKey); + System.out.println("Queue: " + queueName + " Exchange: " + exchangeName); + //produce tuples returns immediately, but we don't want ports to close + createAvoidCompletionThread(); + /* * Create the thread for producing tuples. * The thread is created at initialize time but started. @@ -92,9 +107,10 @@ public synchronized void initialize(OperatorContext context) @Override public void run() { try { - rabbitMQWrapper.Consume(); + produceTuples(); + //rabbitMQWrapper.Consume(); } catch (Exception e) { - // Logger.getLogger(this.getClass()).error("Operator error", e); + e.printStackTrace(); // Logger.getLogger(this.getClass()).error("Operator error", e); } } @@ -127,7 +143,33 @@ public synchronized void allPortsReady() throws Exception { * @throws Exception if an error occurs while submitting a tuple */ private void produceTuples() throws Exception { - + System.out.println("Producing tuples!"); + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) + throws IOException { + String message = new String(body, "UTF-8"); + StreamingOutput out = getOutput(0); + OutputTuple tuple = out.newTuple(); + if (routingKeyAH.isAvailable()){ + tuple.setString(routingKeyAH.getName(), envelope.getRoutingKey()); + System.out.println(routingKeyAH.getName() + ":" + envelope.getRoutingKey()); + } else { + System.out.println("What the hell?? " + routingKeyAH.toString()); + } + tuple.setString(messageAH.getName(),message); + System.out.println("message: " + message); + // Submit tuple to output stream + try { + out.submit(tuple); + } catch (Exception e) { + System.out.println("Catching submit exception"); + e.printStackTrace(); + } + } + }; + channel.basicConsume(queueName, true, consumer); // TODO If there is a finite set of tuples, submit a final punctuation when finished // by uncommenting the following line: // out.punctuate(Punctuation.FINAL_MARKER); @@ -188,10 +230,11 @@ public void NotifyUpdateEvent (String guid, String message){ /** * Shutdown this operator, which will interrupt the thread * executing the produceTuples() method. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. + * @throws TimeoutException + * @throws IOException */ - public synchronized void shutdown() throws Exception { - rabbitMQWrapper.logout(); + public synchronized void shutdown() throws IOException, TimeoutException { + //rabbitMQWrapper.logout(); if (processThread != null) { processThread.interrupt(); processThread = null; diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java index 7f7478d..3e375bc 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java @@ -18,6 +18,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.QueueingConsumer; public class RabbitMQWrapper { @@ -63,18 +65,20 @@ public RabbitMQWrapper(UpdateEvent event, final String exchangeName, public void logout() { try { + channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); + } catch (TimeoutException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } } public void publish(String routingKey, String message) { try { - Channel channel = connection.createChannel(); - String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, ExchangeName, routingKey); log.trace("Producing message: " + message + " in thread: " @@ -82,18 +86,14 @@ public void publish(String routingKey, String message) { channel.basicPublish(ExchangeName, routingKey, null, message.getBytes()); - - channel.close(); - } catch (IOException e) { log.trace("Exception message:" + e.getMessage() + "\r\n"); - } catch (TimeoutException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } public void Consume() { + + try { boolean NO_ACK = false; From bd36a32c3d491ebba1488118c7a50b7f72bc1f66 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Tue, 27 Oct 2015 19:29:58 -0400 Subject: [PATCH 04/12] Removed dependency on RabbitMQWrapper All logic is done in RabbitBaseOper or in the respective Source/Sink operators. --- .../RabbitMQSink/RabbitMQSink.xml | 9 +----- .../RabbitMQSource/RabbitMQSource.xml | 2 +- .../messaging/rabbitmq/RabbitBaseOper.java | 6 +--- .../messaging/rabbitmq/RabbitMQSink.java | 21 ++++++++++---- .../messaging/rabbitmq/RabbitMQSource.java | 29 ++++--------------- 5 files changed, 24 insertions(+), 43 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 77e2b82..41a71c9 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -61,13 +61,6 @@ rstring 1 - - routingKey - Exchange Name. - true - rstring - 1 - routingKeyAttribute Exchange Name. diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 88275a8..0ffecd5 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 332a666..59919b2 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -28,7 +28,6 @@ public class RabbitBaseOper extends AbstractOperator { password = "guest", exchangeName = "logs", exchangeType = "direct", - routingKey = "", queueName = ""; protected int portId = 5672; @@ -120,10 +119,7 @@ public void setRoutingKeyAttribute(String value) { routingKeyAH.setName(value); } - @Parameter(optional=true, description="Exchange Name.") - public void setRoutingKey(String value) { - routingKey = value; - } + } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index 4556f44..ec3ce36 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -50,7 +50,6 @@ @PrimitiveOperator(name="RabbitMQSink", description="something") public class RabbitMQSink extends RabbitBaseOper { - private RabbitMQWrapper rabbitMQWrapper; private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQSink.class); @@ -75,8 +74,8 @@ public synchronized void initialize(OperatorContext context) // or external configuration files or a combination of the two. - rabbitMQWrapper = new RabbitMQWrapper(exchangeName, exchangeType); - rabbitMQWrapper.login(username, password, hostName, portId); +// rabbitMQWrapper = new RabbitMQWrapper(exchangeName, exchangeType); +// rabbitMQWrapper.login(username, password, hostName, portId); // TODO: // If needed, insert code to establish connections or resources to communicate an external system or data store. // The configuration information for this may come from parameters supplied to the operator invocation, @@ -112,9 +111,19 @@ public void process(StreamingInput stream, Tuple tuple) // typically sending tuple data to an external system or data store. String message = tuple.getString(messageAH.getName()); - String guid = tuple.getString(routingKeyAH.getName()); + String routingKey = tuple.getString(routingKeyAH.getName()); - rabbitMQWrapper.publish(guid,message); + try { + log.trace("Producing message: " + message + " in thread: " + + Thread.currentThread().getName()); + channel.basicPublish(exchangeName, routingKey, null, + message.getBytes()); + } catch (IOException e) { + log.trace("Exception message:" + e.getMessage() + "\r\n"); + } + + + // rabbitMQWrapper.publish(guid,message); } @@ -140,7 +149,7 @@ public void processPunctuation(StreamingInput stream, @Override public synchronized void shutdown() throws IOException, TimeoutException { OperatorContext context = getOperatorContext(); - rabbitMQWrapper.logout(); // should force the join() to exit + //rabbitMQWrapper.logout(); // should force the join() to exit super.shutdown(); } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 69ec18c..a829be6 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -50,7 +50,7 @@ @OutputPorts(@OutputPortSet(cardinality=1, optional=false, description="Messages received from Kafka are sent on this output port.")) @PrimitiveOperator(name="RabbitMQSource", description="something") -public class RabbitMQSource extends RabbitBaseOper implements UpdateEvent { +public class RabbitMQSource extends RabbitBaseOper { public static final String EXCHANGENAME_PARAM = "exchangeName"; public static final String ROUTINGKEY_PARAM = "routingKey"; @@ -58,7 +58,7 @@ public class RabbitMQSource extends RabbitBaseOper implements UpdateEvent { public static final String PASSWORD_PARAM="password"; public static final String HOSTNAME_PARAM="hostName"; public static final String PORTID_PARAM="portId"; - + private String routingKey; //private RabbitMQWrapper rabbitMQWrapper; //private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; //private int portIdParam; @@ -175,27 +175,10 @@ public void handleDelivery(String consumerTag, Envelope envelope, // out.punctuate(Punctuation.FINAL_MARKER); } - public void NotifyUpdateEvent (String guid, String message){ - try { - - final StreamingOutput out = getOutput(0); - - // TODO Modify the following code to create and submit tuples to the output port - OutputTuple tuple = out.newTuple(); - - // Set attributes in tuple: - // tuple.setString("AttributeName", "AttributeValue"); - if (!guid.isEmpty()) - tuple.setString(routingKeyAH.getName(),guid); - tuple.setString(messageAH.getName(),message); - // Submit tuple to output stream - out.submit(tuple); - - } catch (Exception e) { - System.out.println("Exception: " + e); - //log. Logger.getLogger(this.getClass()).error("Operator error", e); - } - } + @Parameter(optional=true, description="Exchange Name.") + public void setRoutingKey(String value) { + routingKey = value; + } // @Parameter(optional=true, description="Exchange Name.") // public void setExchangeName(String value) { From 858e3189cf7a5d32683908242b179080b03c72b4 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Tue, 27 Oct 2015 20:12:14 -0400 Subject: [PATCH 05/12] Remove RabbitMQWrapper and UpdateEvent Cleaned up commented out code and unused imports. --- .../RabbitMQSink/RabbitMQSink.xml | 2 +- .../RabbitMQSource/RabbitMQSource.xml | 2 +- .../messaging/rabbitmq/RabbitBaseOper.java | 80 ++--- .../messaging/rabbitmq/RabbitMQSink.java | 225 +++++++------ .../messaging/rabbitmq/RabbitMQSource.java | 303 ++++++++---------- .../messaging/rabbitmq/RabbitMQWrapper.java | 124 ------- .../messaging/rabbitmq/UpdateEvent.java | 17 - 7 files changed, 277 insertions(+), 476 deletions(-) delete mode 100644 com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java delete mode 100644 com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 41a71c9..9e2d993 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 0ffecd5..deea885 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 59919b2..8a90e0d 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -1,17 +1,21 @@ +/******************************************************************************* + * Copyright (C) 2015, MOHAMED-ALI SAID and International Business Machines + * All Rights Reserved + *******************************************************************************/ + package com.ibm.streamsx.messaging.rabbitmq; import java.io.IOException; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeoutException; - import java.util.logging.Logger; -import com.ibm.streams.operator.Type.MetaType; -import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.AbstractOperator; import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.StreamSchema; +import com.ibm.streams.operator.Type.MetaType; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.model.Libraries; import com.ibm.streams.operator.model.Parameter; import com.rabbitmq.client.Channel; @@ -21,32 +25,25 @@ @Libraries({ "opt/downloaded/*" }) public class RabbitBaseOper extends AbstractOperator { -// protected Channel channel; -// protected Connection connection; - protected String hostName = "localhost", - username = "guest", - password = "guest", - exchangeName = "logs", - exchangeType = "direct", + protected Channel channel; + protected Connection connection; + protected String hostName = "localhost", username = "guest", + password = "guest", exchangeName = "logs", exchangeType = "direct", queueName = ""; protected int portId = 5672; - + protected AttributeHelper topicAH = new AttributeHelper("topic"), routingKeyAH = new AttributeHelper("routing_key"), messageAH = new AttributeHelper("message"); - - private final Logger trace = Logger.getLogger(RabbitBaseOper.class + + protected final Logger trace = Logger.getLogger(RabbitBaseOper.class .getCanonicalName()); - - private ConnectionFactory connectionFactory; - private com.rabbitmq.client.Connection connection; - protected Channel channel; - + public synchronized void initialize(OperatorContext context) throws Exception { - // Must call super.initialize(context) to correctly setup an operator. + // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); - connectionFactory = new ConnectionFactory(); + ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setHost(hostName); @@ -54,14 +51,16 @@ public synchronized void initialize(OperatorContext context) connection = connectionFactory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, exchangeType); - //channel.queueDeclare(queueName, false, false, false, null); + trace.log(TraceLevel.INFO, + "Initializing channel connection to exchange: " + exchangeName + + " of type: " + exchangeType + " as user: " + username); } - - public void shutdown() throws IOException, TimeoutException{ - channel.close(); - connection.close(); + + public void shutdown() throws IOException, TimeoutException { + channel.close(); + connection.close(); } - + public void initSchema(StreamSchema ss) throws Exception { Set supportedTypes = new HashSet(); supportedTypes.add(MetaType.RSTRING); @@ -70,9 +69,9 @@ public void initSchema(StreamSchema ss) throws Exception { routingKeyAH.initialize(ss, true, supportedTypes); messageAH.initialize(ss, true, supportedTypes); - + } - + @Parameter(optional = false, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") public void setHostname(String value) { hostName = value; @@ -82,44 +81,35 @@ public void setHostname(String value) { public void setUsername(String value) { username = value; } - + @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") public void setPassword(String value) { password = value; } - - @Parameter(optional=true, description="Exchange Name.") + + @Parameter(optional = true, description = "Exchange Name.") public void setExchangeName(String value) { exchangeName = value; } - + @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".") public void setQueueName(String value) { queueName = value; } - - + @Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") public void setMessageAttribute(String value) { messageAH.setName(value); } -// @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".") -// public void setKeyAttribute(String value) { -// keyAH.setName(value); -// } - - @Parameter(optional=true, description="Port id. Default 5672.") + @Parameter(optional = true, description = "Port id. Default 5672.") public void setPortId(int value) { portId = value; } - - @Parameter(optional=true, description="Exchange Name.") + + @Parameter(optional = true, description = "Exchange Name.") public void setRoutingKeyAttribute(String value) { routingKeyAH.setName(value); } - - } - diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index ec3ce36..b38b623 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -1,119 +1,113 @@ /******************************************************************************* -* Copyright (C) 2015, MOHAMED-ALI SAID -* All Rights Reserved -*******************************************************************************/ + * Copyright (C) 2015, MOHAMED-ALI SAID + * All Rights Reserved + *******************************************************************************/ /* Generated by Streams Studio: March 26, 2014 11:37:11 AM EDT */ package com.ibm.streamsx.messaging.rabbitmq; import java.io.IOException; -import java.lang.Thread.State; -import java.util.List; import java.util.concurrent.TimeoutException; -import com.ibm.streams.operator.AbstractOperator; +import org.slf4j.LoggerFactory; + import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.StreamingData.Punctuation; import com.ibm.streams.operator.StreamingInput; import com.ibm.streams.operator.Tuple; import com.ibm.streams.operator.model.InputPortSet; import com.ibm.streams.operator.model.InputPorts; -import com.ibm.streams.operator.model.Libraries; -import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.model.PrimitiveOperator; - -import com.ibm.streamsx.messaging.rabbitmq.RabbitMQWrapper; -import org.slf4j.LoggerFactory; - /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali - * Class for an operator that consumes tuples and does not produce an output stream. - * This pattern supports a number of input streams and no output streams. + * Class for an operator that consumes tuples and does not produce an output + * stream. This pattern supports a number of input streams and no output + * streams. *

* The following event methods from the Operator interface can be called: *

*
    *
  • initialize() to perform operator initialization
  • - *
  • allPortsReady() notification indicates the operator's ports are ready to process and submit tuples
  • - *
  • process() handles a tuple arriving on an input port - *
  • processPuncuation() handles a punctuation mark arriving on an input port - *
  • shutdown() to shutdown the operator. A shutdown request may occur at any time, - * such as a request to stop a PE or cancel a job. - * Thus the shutdown() may occur while the operator is processing tuples, punctuation marks, - * or even during port ready notification.
  • + *
  • allPortsReady() notification indicates the operator's ports are ready to + * process and submit tuples
  • + *
  • process() handles a tuple arriving on an input port + *
  • processPuncuation() handles a punctuation mark arriving on an input port + *
  • shutdown() to shutdown the operator. A shutdown request may occur at any + * time, such as a request to stop a PE or cancel a job. Thus the shutdown() may + * occur while the operator is processing tuples, punctuation marks, or even + * during port ready notification.
  • *
- *

With the exception of operator initialization, all the other events may occur concurrently with each other, - * which lead to these methods being called concurrently by different threads.

+ *

+ * With the exception of operator initialization, all the other events may occur + * concurrently with each other, which lead to these methods being called + * concurrently by different threads. + *

*/ -@InputPorts(@InputPortSet(cardinality=1, optional=false, -description="")) -@PrimitiveOperator(name="RabbitMQSink", description="something") +@InputPorts(@InputPortSet(cardinality = 1, optional = false, description = "")) +@PrimitiveOperator(name = "RabbitMQSink", description = "something") public class RabbitMQSink extends RabbitBaseOper { - private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQSink.class); - - - - /** - * Initialize this operator. Called once before any tuples are processed. - * @param context OperatorContext for this operator. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ + private static final org.slf4j.Logger log = LoggerFactory + .getLogger(RabbitMQSink.class); + + /** + * Initialize this operator. Called once before any tuples are processed. + * + * @param context + * OperatorContext for this operator. + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. + */ @Override public synchronized void initialize(OperatorContext context) throws Exception { - - // Must call super.initialize(context) to correctly setup an operator. + + // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); super.initSchema(getInput(0).getStreamSchema()); - log.trace("Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - - // TODO: - // If needed, insert code to establish connections or resources to communicate an external system or data store. - // The configuration information for this may come from parameters supplied to the operator invocation, - // or external configuration files or a combination of the two. - - -// rabbitMQWrapper = new RabbitMQWrapper(exchangeName, exchangeType); -// rabbitMQWrapper.login(username, password, hostName, portId); - // TODO: - // If needed, insert code to establish connections or resources to communicate an external system or data store. - // The configuration information for this may come from parameters supplied to the operator invocation, - // or external configuration files or a combination of the two. - // all initialization completed successfully? - } - - /** - * Notification that initialization is complete and all input and output ports - * are connected and ready to receive and submit tuples. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public synchronized void allPortsReady() throws Exception { - // This method is commonly used by source operators. - // Operators that process incoming tuples generally do not need this notification. - OperatorContext context = getOperatorContext(); - log.trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - - - } - - /** - * Process an incoming tuple that arrived on the specified port. - * @param stream Port the tuple is arriving on. - * @param tuple Object representing the incoming tuple. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public void process(StreamingInput stream, Tuple tuple) - throws Exception { - // TODO Insert code here to process the incoming tuple, - // typically sending tuple data to an external system or data store. - - String message = tuple.getString(messageAH.getName()); - String routingKey = tuple.getString(routingKeyAH.getName()); - - try { + log.trace("Operator " + context.getName() + " initializing in PE: " + + context.getPE().getPEId() + " in Job: " + + context.getPE().getJobId()); + + } + + /** + * Notification that initialization is complete and all input and output + * ports are connected and ready to receive and submit tuples. + * + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public synchronized void allPortsReady() throws Exception { + // This method is commonly used by source operators. + // Operators that process incoming tuples generally do not need this + // notification. + OperatorContext context = getOperatorContext(); + log.trace("Operator " + context.getName() + + " all ports are ready in PE: " + context.getPE().getPEId() + + " in Job: " + context.getPE().getJobId()); + + } + + /** + * Process an incoming tuple that arrived on the specified port. + * + * @param stream + * Port the tuple is arriving on. + * @param tuple + * Object representing the incoming tuple. + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public void process(StreamingInput stream, Tuple tuple) + throws Exception { + + String message = tuple.getString(messageAH.getName()); + String routingKey = tuple.getString(routingKeyAH.getName()); + + try { log.trace("Producing message: " + message + " in thread: " + Thread.currentThread().getName()); channel.basicPublish(exchangeName, routingKey, null, @@ -121,36 +115,33 @@ public void process(StreamingInput stream, Tuple tuple) } catch (IOException e) { log.trace("Exception message:" + e.getMessage() + "\r\n"); } - - - // rabbitMQWrapper.publish(guid,message); - - - } - - /** - * Process an incoming punctuation that arrived on the specified port. - * @param stream Port the punctuation is arriving on. - * @param mark The punctuation mark - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public void processPunctuation(StreamingInput stream, - Punctuation mark) throws Exception { - // TODO: If window punctuations are meaningful to the external system or data store, - // insert code here to process the incoming punctuation. - } - - /** - * Shutdown this operator. - * @throws TimeoutException - * @throws IOException - */ - @Override - public synchronized void shutdown() throws IOException, TimeoutException { - OperatorContext context = getOperatorContext(); - //rabbitMQWrapper.logout(); // should force the join() to exit - super.shutdown(); - } - + + } + + /** + * Process an incoming punctuation that arrived on the specified port. + * + * @param stream + * Port the punctuation is arriving on. + * @param mark + * The punctuation mark + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public void processPunctuation(StreamingInput stream, + Punctuation mark) throws Exception { + } + + /** + * Shutdown this operator. + * + * @throws TimeoutException + * @throws IOException + */ + @Override + public synchronized void shutdown() throws IOException, TimeoutException { + super.shutdown(); + } + } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index a829be6..3df236f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -1,149 +1,140 @@ /* Generated by Streams Studio: March 26, 2014 2:09:26 PM EDT */ /******************************************************************************* -* Copyright (C) 2015, MOHAMED-ALI SAID -* All Rights Reserved -*******************************************************************************/ + * Copyright (C) 2015, MOHAMED-ALI SAID + * All Rights Reserved + *******************************************************************************/ package com.ibm.streamsx.messaging.rabbitmq; - //import org.apache.log4j.Logger; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeoutException; -import com.ibm.streams.operator.AbstractOperator; import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.OutputTuple; import com.ibm.streams.operator.StreamingOutput; -import com.ibm.streams.operator.model.Libraries; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.model.OutputPortSet; import com.ibm.streams.operator.model.OutputPorts; import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.model.PrimitiveOperator; -import com.ibm.streamsx.messaging.rabbitmq.UpdateEvent; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; import org.slf4j.LoggerFactory; + /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali - * A source operator that does not receive any input streams and produces new tuples. - * The method produceTuples is called to begin submitting tuples. + * A source operator that does not receive any input streams and produces new + * tuples. The method produceTuples is called to begin submitting + * tuples. *

- * For a source operator, the following event methods from the Operator interface can be called: + * For a source operator, the following event methods from the Operator + * interface can be called: *

*
    *
  • initialize() to perform operator initialization
  • - *
  • allPortsReady() notification indicates the operator's ports are ready to process and submit tuples
  • - *
  • shutdown() to shutdown the operator. A shutdown request may occur at any time, - * such as a request to stop a PE or cancel a job. - * Thus the shutdown() may occur while the operator is processing tuples, punctuation marks, - * or even during port ready notification.
  • + *
  • allPortsReady() notification indicates the operator's ports are ready to + * process and submit tuples
  • + *
  • shutdown() to shutdown the operator. A shutdown request may occur at any + * time, such as a request to stop a PE or cancel a job. Thus the shutdown() may + * occur while the operator is processing tuples, punctuation marks, or even + * during port ready notification.
  • *
- *

With the exception of operator initialization, all the other events may occur concurrently with each other, - * which lead to these methods being called concurrently by different threads.

+ *

+ * With the exception of operator initialization, all the other events may occur + * concurrently with each other, which lead to these methods being called + * concurrently by different threads. + *

*/ -@OutputPorts(@OutputPortSet(cardinality=1, optional=false, -description="Messages received from Kafka are sent on this output port.")) -@PrimitiveOperator(name="RabbitMQSource", description="something") +@OutputPorts(@OutputPortSet(cardinality = 1, optional = false, description = "Messages received from Kafka are sent on this output port.")) +@PrimitiveOperator(name = "RabbitMQSource", description = "something") public class RabbitMQSource extends RabbitBaseOper { public static final String EXCHANGENAME_PARAM = "exchangeName"; public static final String ROUTINGKEY_PARAM = "routingKey"; - public static final String USERNAME_PARAM="userName"; - public static final String PASSWORD_PARAM="password"; - public static final String HOSTNAME_PARAM="hostName"; - public static final String PORTID_PARAM="portId"; + public static final String USERNAME_PARAM = "userName"; + public static final String PASSWORD_PARAM = "password"; + public static final String HOSTNAME_PARAM = "hostName"; + public static final String PORTID_PARAM = "portId"; private String routingKey; - //private RabbitMQWrapper rabbitMQWrapper; - //private String exchangeNameParam, routingKeyParam,userNameParam,passwordParam,hostNameParam; - //private int portIdParam; - - private static final org.slf4j.Logger log = LoggerFactory.getLogger(RabbitMQSource.class); + + private static final org.slf4j.Logger log = LoggerFactory + .getLogger(RabbitMQSource.class); + /** + * Thread for calling produceTuples() to produce tuples + */ + private Thread processThread; + /** - * Thread for calling produceTuples() to produce tuples + * Initialize this operator. Called once before any tuples are processed. + * + * @param context + * OperatorContext for this operator. + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. */ - private Thread processThread; - - /** - * Initialize this operator. Called once before any tuples are processed. - * @param context OperatorContext for this operator. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public synchronized void initialize(OperatorContext context) - throws Exception { - // Must call super.initialize(context) to correctly setup an operator. - super.initialize(context); - super.initSchema(getOutput(0).getStreamSchema()); - log.trace(this.getClass().getName()+"Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - // TODO: - // If needed, insert code to establish connections or resources to communicate an external system or data store. - // The configuration information for this may come from parameters supplied to the operator invocation, -// // or external configuration files or a combination of the two. -// rabbitMQWrapper = new RabbitMQWrapper(this,exchangeName,routingKey); -// rabbitMQWrapper.login(username, password, hostName, portId); - if (queueName == ""){ - queueName = channel.queueDeclare().getQueue(); - } - - channel.queueBind(queueName, exchangeName, routingKey); - System.out.println("Queue: " + queueName + " Exchange: " + exchangeName); - //produce tuples returns immediately, but we don't want ports to close - createAvoidCompletionThread(); - - /* - * Create the thread for producing tuples. - * The thread is created at initialize time but started. - * The thread will be started by allPortsReady(). - */ - processThread = getOperatorContext().getThreadFactory().newThread( - new Runnable() { - - @Override - public void run() { - try { - produceTuples(); - //rabbitMQWrapper.Consume(); - } catch (Exception e) { - e.printStackTrace(); // Logger.getLogger(this.getClass()).error("Operator error", e); - } - } - - }); - - /* - * Set the thread not to be a daemon to ensure that the SPL runtime - * will wait for the thread to complete before determining the - * operator is complete. - */ - processThread.setDaemon(false); - } - - /** - * Notification that initialization is complete and all input and output ports - * are connected and ready to receive and submit tuples. - * @throws Exception Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public synchronized void allPortsReady() throws Exception { - OperatorContext context = getOperatorContext(); - //Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - // Start a thread for producing tuples because operator - // implementations must not block and must return control to the caller. - processThread.start(); - } - - /** - * Submit new tuples to the output stream - * @throws Exception if an error occurs while submitting a tuple - */ - private void produceTuples() throws Exception { - System.out.println("Producing tuples!"); + @Override + public synchronized void initialize(OperatorContext context) + throws Exception { + super.initialize(context); + super.initSchema(getOutput(0).getStreamSchema()); + log.trace(this.getClass().getName() + "Operator " + context.getName() + + " initializing in PE: " + context.getPE().getPEId() + + " in Job: " + context.getPE().getJobId()); + + if (queueName == "") { + queueName = channel.queueDeclare().getQueue(); + } + + channel.queueBind(queueName, exchangeName, routingKey); + System.out + .println("Queue: " + queueName + " Exchange: " + exchangeName); + // produce tuples returns immediately, but we don't want ports to close + createAvoidCompletionThread(); + + processThread = getOperatorContext().getThreadFactory().newThread( + new Runnable() { + + @Override + public void run() { + try { + produceTuples(); + // rabbitMQWrapper.Consume(); + } catch (Exception e) { + e.printStackTrace(); // Logger.getLogger(this.getClass()).error("Operator error", + // e); + } + } + + }); + + processThread.setDaemon(false); + } + + /** + * Notification that initialization is complete and all input and output + * ports are connected and ready to receive and submit tuples. + * + * @throws Exception + * Operator failure, will cause the enclosing PE to terminate. + */ + @Override + public synchronized void allPortsReady() throws Exception { + processThread.start(); + } + + /** + * Submit new tuples to the output stream + * + * @throws Exception + * if an error occurs while submitting a tuple + */ + private void produceTuples() throws Exception { + System.out.println("Producing tuples!"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, @@ -152,13 +143,16 @@ public void handleDelivery(String consumerTag, Envelope envelope, String message = new String(body, "UTF-8"); StreamingOutput out = getOutput(0); OutputTuple tuple = out.newTuple(); - if (routingKeyAH.isAvailable()){ - tuple.setString(routingKeyAH.getName(), envelope.getRoutingKey()); - System.out.println(routingKeyAH.getName() + ":" + envelope.getRoutingKey()); + if (routingKeyAH.isAvailable()) { + tuple.setString(routingKeyAH.getName(), + envelope.getRoutingKey()); + System.out.println(routingKeyAH.getName() + ":" + + envelope.getRoutingKey()); } else { - System.out.println("What the hell?? " + routingKeyAH.toString()); + System.out.println("What the hell?? " + + routingKeyAH.toString()); } - tuple.setString(messageAH.getName(),message); + tuple.setString(messageAH.getName(), message); System.out.println("message: " + message); // Submit tuple to output stream try { @@ -169,65 +163,32 @@ public void handleDelivery(String consumerTag, Envelope envelope, } } }; - channel.basicConsume(queueName, true, consumer); - // TODO If there is a finite set of tuples, submit a final punctuation when finished - // by uncommenting the following line: - // out.punctuate(Punctuation.FINAL_MARKER); - } - - @Parameter(optional=true, description="Exchange Name.") + channel.basicConsume(queueName, true, consumer); + } + + @Parameter(optional = true, description = "Exchange Name.") public void setRoutingKey(String value) { routingKey = value; } - -// @Parameter(optional=true, description="Exchange Name.") -// public void setExchangeName(String value) { -// exchangeNameParam = value; -// } -// -// @Parameter(optional=true, description="Exchange Name.") -// public void setRoutingKey(String value) { -// routingKeyParam = value; -// } -// -// @Parameter(optional=true, description="Exchange Name.") -// public void setUsername(String value) { -// userNameParam = value; -// } -// -// @Parameter(optional=true, description="Exchange Name.") -// public void setPassword(String value) { -// passwordParam = value; -// } -// -// @Parameter(optional=true, description="Exchange Name.") -// public void setHostname(String value) { -// hostNameParam = value; -// } -// -// @Parameter(optional=true, description="Exchange Name.") -// public void setPortId(int value) { -// portIdParam = value; -// } - - /** - * Shutdown this operator, which will interrupt the thread - * executing the produceTuples() method. - * @throws TimeoutException - * @throws IOException - */ - public synchronized void shutdown() throws IOException, TimeoutException { - //rabbitMQWrapper.logout(); - if (processThread != null) { - processThread.interrupt(); - processThread = null; - } - OperatorContext context = getOperatorContext(); - // Logger.getLogger(this.getClass()).trace("Operator " + context.getName() + " shutting down in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId() ); - - // TODO: If needed, close connections or release resources related to any external system or data store. - - // Must call super.shutdown() - super.shutdown(); - } + + /** + * Shutdown this operator, which will interrupt the thread executing the + * produceTuples() method. + * + * @throws TimeoutException + * @throws IOException + */ + public synchronized void shutdown() throws IOException, TimeoutException { + // rabbitMQWrapper.logout(); + if (processThread != null) { + processThread.interrupt(); + processThread = null; + } + OperatorContext context = getOperatorContext(); + trace.log(TraceLevel.ALL, "Operator " + context.getName() + + " shutting down in PE: " + context.getPE().getPEId() + + " in Job: " + context.getPE().getJobId()); + // Must call super.shutdown() + super.shutdown(); + } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java deleted file mode 100644 index 3e375bc..0000000 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQWrapper.java +++ /dev/null @@ -1,124 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2015, MOHAMED-ALI SAID - * All Rights Reserved - *******************************************************************************/ -package com.ibm.streamsx.messaging.rabbitmq; - -import java.io.File; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.TimeoutException; -import java.util.logging.Level; - -import org.apache.log4j.Logger; -import org.slf4j.LoggerFactory; - -import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Consumer; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.client.QueueingConsumer; - -public class RabbitMQWrapper { - - private static final org.slf4j.Logger log = LoggerFactory - .getLogger(RabbitMQWrapper.class); - private final String ExchangeName; - private final String RoutingKey; - private final String exchangeType; - - private UpdateEvent updateEvent; - private ConnectionFactory connectionFactory; - private com.rabbitmq.client.Connection connection; - private Channel channel; - - public void login(String userName, String password, String hostName, - int port) throws IOException, TimeoutException { - connectionFactory = new ConnectionFactory(); - connectionFactory.setUsername(userName); - connectionFactory.setPassword(password); - connectionFactory.setHost(hostName); - connectionFactory.setPort(port); - connection = connectionFactory.newConnection(); - channel = connection.createChannel(); - channel.exchangeDeclare(ExchangeName, exchangeType); - } - - public RabbitMQWrapper(final String exchangeName, final String exchangeT) { - ExchangeName = exchangeName; - RoutingKey = ""; - exchangeType = exchangeT; - } - - - - public RabbitMQWrapper(UpdateEvent event, final String exchangeName, - final String routingKey) { - updateEvent = event; - ExchangeName = exchangeName; - RoutingKey = routingKey; - exchangeType = "direct"; - } - - public void logout() { - try { - channel.close(); - connection.close(); - - } catch (IOException e) { - e.printStackTrace(); - } catch (TimeoutException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void publish(String routingKey, String message) { - - try { - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, ExchangeName, routingKey); - log.trace("Producing message: " + message + " in thread: " - + Thread.currentThread().getName()); - - channel.basicPublish(ExchangeName, routingKey, null, - message.getBytes()); - } catch (IOException e) { - log.trace("Exception message:" + e.getMessage() + "\r\n"); - } - } - - public void Consume() { - - - - try { - boolean NO_ACK = false; - String queueName = channel.queueDeclare().getQueue(); - channel.queueBind(queueName, ExchangeName, RoutingKey); - QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicConsume(queueName, NO_ACK, consumer); - while (true) { // you might want to implement some loop-finishing - // logic here ;) - QueueingConsumer.Delivery delivery; - try { - delivery = consumer.nextDelivery(); - String Message = new String(delivery.getBody()); - log.trace("received message: " + Message + " in thread: " - + Thread.currentThread().getName()); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), - false); - updateEvent.NotifyUpdateEvent(RoutingKey, Message); - } catch (InterruptedException ie) { - continue; - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } - -} diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java deleted file mode 100644 index 45431bd..0000000 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/UpdateEvent.java +++ /dev/null @@ -1,17 +0,0 @@ -/******************************************************************************* -* Copyright (C) 2015, MOHAMED-ALI SAID -* All Rights Reserved -*******************************************************************************/ - -package com.ibm.streamsx.messaging.rabbitmq; - - -import java.io.IOException; - - -public interface UpdateEvent -{ - // This is just a regular method so it can return something or - // take arguments if you like. - public void NotifyUpdateEvent (String guid, String message); -} From ec769dbc67c1f54620fbcf4fa4ebb8d876788143 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Wed, 28 Oct 2015 12:48:53 -0400 Subject: [PATCH 06/12] Added support for multiple routing keys in RabbitMQSource #147 --- .../RabbitMQSink/RabbitMQSink.xml | 2 +- .../RabbitMQSource/RabbitMQSource.xml | 4 +-- .../messaging/rabbitmq/RabbitMQSource.java | 36 +++++++++++++------ 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 9e2d993..a6a2c98 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index deea885..abaf118 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -66,7 +66,7 @@ Exchange Name. true rstring - 1 + -1
routingKeyAttribute diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 3df236f..100e7ea 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -8,6 +8,8 @@ //import org.apache.log4j.Logger; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import com.ibm.streams.operator.OperatorContext; @@ -60,7 +62,7 @@ public class RabbitMQSource extends RabbitBaseOper { public static final String PASSWORD_PARAM = "password"; public static final String HOSTNAME_PARAM = "hostName"; public static final String PORTID_PARAM = "portId"; - private String routingKey; + private List routingKeys = new ArrayList();; private static final org.slf4j.Logger log = LoggerFactory .getLogger(RabbitMQSource.class); @@ -86,13 +88,7 @@ public synchronized void initialize(OperatorContext context) + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); - if (queueName == "") { - queueName = channel.queueDeclare().getQueue(); - } - - channel.queueBind(queueName, exchangeName, routingKey); - System.out - .println("Queue: " + queueName + " Exchange: " + exchangeName); + initRabbitChannel(); // produce tuples returns immediately, but we don't want ports to close createAvoidCompletionThread(); @@ -115,6 +111,18 @@ public void run() { processThread.setDaemon(false); } + private void initRabbitChannel() throws IOException { + if (queueName == "") { + queueName = channel.queueDeclare().getQueue(); + } + + for (String routingKey : routingKeys){ + channel.queueBind(queueName, exchangeName, routingKey); + System.out + .println("Queue: " + queueName + " Exchange: " + exchangeName); + } + } + /** * Notification that initialization is complete and all input and output * ports are connected and ready to receive and submit tuples. @@ -166,10 +174,16 @@ public void handleDelivery(String consumerTag, Envelope envelope, channel.basicConsume(queueName, true, consumer); } +// @Parameter(optional = true, description = "Exchange Name.") +// public void setRoutingKey(String value) { +// routingKey = value; +// } + @Parameter(optional = true, description = "Exchange Name.") - public void setRoutingKey(String value) { - routingKey = value; - } + public void setRoutingKey(List values) { + if(values!=null) + routingKeys.addAll(values); + } /** * Shutdown this operator, which will interrupt the thread executing the From ccc7e62a1878ec1b66d26038b82f29bc34075037 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Wed, 28 Oct 2015 18:54:37 -0400 Subject: [PATCH 07/12] Added support for multiple hosts. --- .../RabbitMQSink/RabbitMQSink.xml | 13 ++---- .../RabbitMQSource/RabbitMQSource.xml | 13 ++---- .../messaging/rabbitmq/RabbitBaseOper.java | 41 +++++++++++++++---- .../messaging/rabbitmq/RabbitMQSource.java | 2 +- 4 files changed, 39 insertions(+), 30 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index a6a2c98..fc960c7 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -27,11 +27,11 @@ 1 - hostname + hostAndPort Name of the attribute for the message. This attribute is required. Default is "message". false rstring - 1 + -1 messageAttribute @@ -47,13 +47,6 @@ rstring 1 - - portId - Port id. Default 5672. - true - int32 - 1 - queueName Name of the attribute for the key. Default is "key". diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index abaf118..631eaa1 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -27,11 +27,11 @@ 1 - hostname + hostAndPort Name of the attribute for the message. This attribute is required. Default is "message". false rstring - 1 + -1 messageAttribute @@ -47,13 +47,6 @@ rstring 1 - - portId - Port id. Default 5672. - true - int32 - 1 - queueName Name of the attribute for the key. Default is "key". diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 8a90e0d..ed1c029 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -6,7 +6,11 @@ package com.ibm.streamsx.messaging.rabbitmq; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; @@ -18,6 +22,7 @@ import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.model.Libraries; import com.ibm.streams.operator.model.Parameter; +import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @@ -30,7 +35,9 @@ public class RabbitBaseOper extends AbstractOperator { protected String hostName = "localhost", username = "guest", password = "guest", exchangeName = "logs", exchangeType = "direct", queueName = ""; + protected List hostAndPortList = new ArrayList(); protected int portId = 5672; + protected Address[] addressArr; protected AttributeHelper topicAH = new AttributeHelper("topic"), routingKeyAH = new AttributeHelper("routing_key"), @@ -46,9 +53,12 @@ public synchronized void initialize(OperatorContext context) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername(username); connectionFactory.setPassword(password); - connectionFactory.setHost(hostName); - connectionFactory.setPort(portId); - connection = connectionFactory.newConnection(); + //connectionFactory.setHost(hostName); + //connectionFactory.setPort(portId); + //hostAndPortList.add(hostName); ports.add(portId); + addressArr = buildAddressArray(hostAndPortList); + System.out.println("Addr Array: " + addressArr[0].getHost() + ":" + addressArr[0].getPort()); + connection = connectionFactory.newConnection(addressArr); channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, exchangeType); trace.log(TraceLevel.INFO, @@ -56,6 +66,19 @@ public synchronized void initialize(OperatorContext context) + " of type: " + exchangeType + " as user: " + username); } + private Address[] buildAddressArray(List hostsAndPorts) throws MalformedURLException { + Address[] addrArr = new Address[hostsAndPorts.size()]; + int i = 0; + for (String hostAndPort : hostsAndPorts){ + URL tmpURL = new URL("http://" + hostAndPort); + addrArr[i++] = new Address(tmpURL.getHost(), tmpURL.getPort()); + System.out.println("Adding: " + tmpURL.getHost() + ":"+ tmpURL.getPort()); + } + trace.log(TraceLevel.INFO, "Built address array: \n" + addrArr.toString()); + + return addrArr; + } + public void shutdown() throws IOException, TimeoutException { channel.close(); connection.close(); @@ -73,8 +96,8 @@ public void initSchema(StreamSchema ss) throws Exception { } @Parameter(optional = false, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") - public void setHostname(String value) { - hostName = value; + public void setHostAndPort(List value) { + hostAndPortList.addAll(value); } @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") @@ -102,10 +125,10 @@ public void setMessageAttribute(String value) { messageAH.setName(value); } - @Parameter(optional = true, description = "Port id. Default 5672.") - public void setPortId(int value) { - portId = value; - } +// @Parameter(optional = true, description = "Port id. Default 5672.") +// public void setPortId(int value) { +// portId = value; +// } @Parameter(optional = true, description = "Exchange Name.") public void setRoutingKeyAttribute(String value) { diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 100e7ea..43c63c7 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -62,7 +62,7 @@ public class RabbitMQSource extends RabbitBaseOper { public static final String PASSWORD_PARAM = "password"; public static final String HOSTNAME_PARAM = "hostName"; public static final String PORTID_PARAM = "portId"; - private List routingKeys = new ArrayList();; + private List routingKeys = new ArrayList(); private static final org.slf4j.Logger log = LoggerFactory .getLogger(RabbitMQSource.class); From 23ee0a6fe8228538b23aefae88bd3b541ded7534 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Thu, 29 Oct 2015 10:02:21 -0400 Subject: [PATCH 08/12] Basic auto-retry implemented --- .../RabbitMQSink/RabbitMQSink.xml | 9 +++++++- .../RabbitMQSource/RabbitMQSource.xml | 9 +++++++- .../messaging/rabbitmq/RabbitBaseOper.java | 21 +++++++++++++++---- .../messaging/rabbitmq/RabbitMQSink.java | 8 +++---- 4 files changed, 37 insertions(+), 10 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index fc960c7..c427006 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -68,6 +68,13 @@ rstring 1 + + virtualHost + Exchange Name. + true + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 631eaa1..7627569 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -75,6 +75,13 @@ rstring 1 + + virtualHost + Exchange Name. + true + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index ed1c029..838a1b1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -38,6 +38,7 @@ public class RabbitBaseOper extends AbstractOperator { protected List hostAndPortList = new ArrayList(); protected int portId = 5672; protected Address[] addressArr; + private String vHost; protected AttributeHelper topicAH = new AttributeHelper("topic"), routingKeyAH = new AttributeHelper("routing_key"), @@ -53,9 +54,9 @@ public synchronized void initialize(OperatorContext context) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername(username); connectionFactory.setPassword(password); - //connectionFactory.setHost(hostName); - //connectionFactory.setPort(portId); - //hostAndPortList.add(hostName); ports.add(portId); + connectionFactory.setAutomaticRecoveryEnabled(true); + if (vHost != null) + connectionFactory.setVirtualHost(vHost); addressArr = buildAddressArray(hostAndPortList); System.out.println("Addr Array: " + addressArr[0].getHost() + ":" + addressArr[0].getPort()); connection = connectionFactory.newConnection(addressArr); @@ -64,6 +65,8 @@ public synchronized void initialize(OperatorContext context) trace.log(TraceLevel.INFO, "Initializing channel connection to exchange: " + exchangeName + " of type: " + exchangeType + " as user: " + username); + trace.log(TraceLevel.INFO, + "Connection to host: " + connection.getAddress()); } private Address[] buildAddressArray(List hostsAndPorts) throws MalformedURLException { @@ -81,7 +84,12 @@ private Address[] buildAddressArray(List hostsAndPorts) throws Malformed public void shutdown() throws IOException, TimeoutException { channel.close(); - connection.close(); + try { + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + trace.log(TraceLevel.ALL, "Exception at close: " + e.toString()); + } } public void initSchema(StreamSchema ss) throws Exception { @@ -134,5 +142,10 @@ public void setMessageAttribute(String value) { public void setRoutingKeyAttribute(String value) { routingKeyAH.setName(value); } + + @Parameter(optional = true, description = "Exchange Name.") + public void setVirtualHost(String value) { + vHost = value; + } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index b38b623..e2eff18 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -17,6 +17,7 @@ import com.ibm.streams.operator.model.InputPortSet; import com.ibm.streams.operator.model.InputPorts; import com.ibm.streams.operator.model.PrimitiveOperator; +import com.rabbitmq.client.AlreadyClosedException; /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali @@ -102,7 +103,7 @@ public synchronized void allPortsReady() throws Exception { */ @Override public void process(StreamingInput stream, Tuple tuple) - throws Exception { + { String message = tuple.getString(messageAH.getName()); String routingKey = tuple.getString(routingKeyAH.getName()); @@ -112,10 +113,9 @@ public void process(StreamingInput stream, Tuple tuple) + Thread.currentThread().getName()); channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); - } catch (IOException e) { + } catch (Exception e) { log.trace("Exception message:" + e.getMessage() + "\r\n"); - } - + } } /** From 24ed6c6ad03e5d3747646d2de7cfa7576aba52fb Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Thu, 29 Oct 2015 18:04:51 -0400 Subject: [PATCH 09/12] This won't work, but pushing so that people can view my code Getting weird errors when trying to set Map attributes: Caused by: java.lang.UnsupportedOperationException: Conversion between SPL type rstring (MetaType.RSTRING) and Java type java.lang.String is not supported --- .../RabbitMQSink/RabbitMQSink.xml | 9 +++++- .../RabbitMQSource/RabbitMQSource.xml | 9 +++++- .../messaging/rabbitmq/RabbitBaseOper.java | 20 ++++++++----- .../messaging/rabbitmq/RabbitMQSink.java | 9 +++++- .../messaging/rabbitmq/RabbitMQSource.java | 28 +++++++++++++++---- 5 files changed, 59 insertions(+), 16 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index c427006..6e257a5 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -19,6 +19,13 @@ + + automaticRecovery + Exchange Name. + true + boolean + 1 + exchangeName Exchange Name. diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 7627569..46db1b6 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -19,6 +19,13 @@ + + automaticRecovery + Exchange Name. + true + boolean + 1 + exchangeName Exchange Name. diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 838a1b1..5c3da1a 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -27,7 +27,7 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -@Libraries({ "opt/downloaded/*" }) +@Libraries({ "opt/downloaded/*"/*, "@RABBITMQ_HOME@" */}) public class RabbitBaseOper extends AbstractOperator { protected Channel channel; @@ -39,6 +39,7 @@ public class RabbitBaseOper extends AbstractOperator { protected int portId = 5672; protected Address[] addressArr; private String vHost; + private Boolean autoRecovery = true; protected AttributeHelper topicAH = new AttributeHelper("topic"), routingKeyAH = new AttributeHelper("routing_key"), @@ -54,7 +55,7 @@ public synchronized void initialize(OperatorContext context) ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername(username); connectionFactory.setPassword(password); - connectionFactory.setAutomaticRecoveryEnabled(true); + connectionFactory.setAutomaticRecoveryEnabled(autoRecovery); if (vHost != null) connectionFactory.setVirtualHost(vHost); addressArr = buildAddressArray(hostAndPortList); @@ -133,11 +134,6 @@ public void setMessageAttribute(String value) { messageAH.setName(value); } -// @Parameter(optional = true, description = "Port id. Default 5672.") -// public void setPortId(int value) { -// portId = value; -// } - @Parameter(optional = true, description = "Exchange Name.") public void setRoutingKeyAttribute(String value) { routingKeyAH.setName(value); @@ -147,5 +143,15 @@ public void setRoutingKeyAttribute(String value) { public void setVirtualHost(String value) { vHost = value; } + + @Parameter(optional = true, description = "Exchange Name.") + public void setAutomaticRecovery(Boolean value) { + autoRecovery = value; + } + +// @Parameter(optional = true, description = "Exchange Name.") +// public void setAutomaticRecovery(Boolean value) { +// autoRecovery = value; +// } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index e2eff18..c6e4bb1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -6,6 +6,8 @@ package com.ibm.streamsx.messaging.rabbitmq; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeoutException; import org.slf4j.LoggerFactory; @@ -17,6 +19,7 @@ import com.ibm.streams.operator.model.InputPortSet; import com.ibm.streams.operator.model.InputPorts; import com.ibm.streams.operator.model.PrimitiveOperator; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AlreadyClosedException; /** @@ -111,7 +114,11 @@ public void process(StreamingInput stream, Tuple tuple) try { log.trace("Producing message: " + message + " in thread: " + Thread.currentThread().getName()); - channel.basicPublish(exchangeName, routingKey, null, + BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); + Map headers = new HashMap(); + headers.put("test", "myHeader"); + propsBuilder.headers(headers); + channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), message.getBytes()); } catch (Exception e) { log.trace("Exception message:" + e.getMessage() + "\r\n"); diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 43c63c7..32804e6 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -9,7 +9,11 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeoutException; import com.ibm.streams.operator.OperatorContext; @@ -56,12 +60,6 @@ @PrimitiveOperator(name = "RabbitMQSource", description = "something") public class RabbitMQSource extends RabbitBaseOper { - public static final String EXCHANGENAME_PARAM = "exchangeName"; - public static final String ROUTINGKEY_PARAM = "routingKey"; - public static final String USERNAME_PARAM = "userName"; - public static final String PASSWORD_PARAM = "password"; - public static final String HOSTNAME_PARAM = "hostName"; - public static final String PORTID_PARAM = "portId"; private List routingKeys = new ArrayList(); private static final org.slf4j.Logger log = LoggerFactory @@ -150,7 +148,20 @@ public void handleDelivery(String consumerTag, Envelope envelope, throws IOException { String message = new String(body, "UTF-8"); StreamingOutput out = getOutput(0); + Map msgHeader = properties.getHeaders(); + Map headers = new HashMap(); + + Iterator> it = msgHeader.entrySet().iterator(); + while (it.hasNext()){ + Map.Entry pair = it.next(); + System.out.println("Header: " + pair.getKey() + ":" + pair.getValue().toString()); + headers.put(pair.getKey(), pair.getValue().toString()); + } + + OutputTuple tuple = out.newTuple(); + System.out.println("Schema: " + tuple.getStreamSchema().getAttributeNames().toString()); + if (routingKeyAH.isAvailable()) { tuple.setString(routingKeyAH.getName(), envelope.getRoutingKey()); @@ -161,6 +172,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, + routingKeyAH.toString()); } tuple.setString(messageAH.getName(), message); + + + + tuple.setMap("msgHeader", headers); + System.out.println("message: " + message); // Submit tuple to output stream try { From 6feed4e6e282a8140e10f5bd36fe077cd704c4d5 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Fri, 30 Oct 2015 15:31:10 -0400 Subject: [PATCH 10/12] Implemented automatic recovery and message send retry. Improved docs and logging/error handling. --- .../RabbitMQSink/RabbitMQSink.xml | 51 +++++++--- .../RabbitMQSource/RabbitMQSource.xml | 29 +++--- .../messaging/rabbitmq/RabbitBaseOper.java | 46 +++++---- .../messaging/rabbitmq/RabbitMQSink.java | 98 +++++++++++++------ .../messaging/rabbitmq/RabbitMQSource.java | 56 ++++++----- 5 files changed, 174 insertions(+), 106 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml index 6e257a5..0449a03 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSink/RabbitMQSink.xml @@ -1,7 +1,7 @@ - + something @@ -21,63 +21,84 @@ automaticRecovery - Exchange Name. + Have connections to RabbitMQ automatically recovered. Default is true. true boolean 1 - exchangeName - Exchange Name. + deliveryMode + Marks message as persistent(2) or non-persistent(1). Default as 1. true + int32 + 1 + + + exchangeName + Required attribute. Name of the RabbitMQ exchange. + false rstring 1 hostAndPort - Name of the attribute for the message. This attribute is required. Default is "message". + List of host and port in form: "myhost1:3456","myhost2:3456". false rstring -1 + + maxMessageSendRetries + This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted. + true + int32 + 1 + messageAttribute - Name of the attribute for the message. This attribute is required. Default is "message". + Name of the attribute for the message. Default is "message". true rstring 1 - password - Name of the attribute for the key. Default is "key". - false - rstring + messageSendRetryDelay + This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter. + true + int32 1 - queueName - Name of the attribute for the key. Default is "key". + msgHeaderAttribute + Name of the attribute for the message_header. Schema of type must be Map<ustring,ustring>. Default is "message_header". true rstring 1 + + password + Password for RabbitMQ authentication. + false + rstring + 1 + routingKeyAttribute - Exchange Name. + Name of the attribute for the routing_key. Default is "routing_key". true rstring 1 username - Name of the attribute for the key. Default is "key". + Username for RabbitMQ authentication. false rstring 1 virtualHost - Exchange Name. + Set Virtual Host. Default is null. true rstring 1 diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml index 46db1b6..d90d0a9 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.rabbitmq/RabbitMQSource/RabbitMQSource.xml @@ -1,7 +1,7 @@ - + something @@ -21,42 +21,49 @@ automaticRecovery - Exchange Name. + Have connections to RabbitMQ automatically recovered. Default is true. true boolean 1 exchangeName - Exchange Name. - true + Required attribute. Name of the RabbitMQ exchange. + false rstring 1 hostAndPort - Name of the attribute for the message. This attribute is required. Default is "message". + List of host and port in form: "myhost1:3456","myhost2:3456". false rstring -1 messageAttribute - Name of the attribute for the message. This attribute is required. Default is "message". + Name of the attribute for the message. Default is "message". + true + rstring + 1 + + + msgHeaderAttribute + Name of the attribute for the message_header. Schema of type must be Map<ustring,ustring>. Default is "message_header". true rstring 1 password - Name of the attribute for the key. Default is "key". + Password for RabbitMQ authentication. false rstring 1 queueName - Name of the attribute for the key. Default is "key". + Name of the queue. Main reason to specify is to facilitate parallel consuming. Default is a random queue name.. true rstring 1 @@ -70,21 +77,21 @@ routingKeyAttribute - Exchange Name. + Name of the attribute for the routing_key. Default is "routing_key". true rstring 1 username - Name of the attribute for the key. Default is "key". + Username for RabbitMQ authentication. false rstring 1 virtualHost - Exchange Name. + Set Virtual Host. Default is null. true rstring 1 diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 5c3da1a..39ae943 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -33,19 +33,18 @@ public class RabbitBaseOper extends AbstractOperator { protected Channel channel; protected Connection connection; protected String hostName = "localhost", username = "guest", - password = "guest", exchangeName = "logs", exchangeType = "direct", - queueName = ""; + password = "guest", exchangeName = "", exchangeType = "direct"; + protected List hostAndPortList = new ArrayList(); - protected int portId = 5672; protected Address[] addressArr; private String vHost; private Boolean autoRecovery = true; - protected AttributeHelper topicAH = new AttributeHelper("topic"), + protected AttributeHelper messageHeaderAH = new AttributeHelper("msg_header"), routingKeyAH = new AttributeHelper("routing_key"), messageAH = new AttributeHelper("message"); - protected final Logger trace = Logger.getLogger(RabbitBaseOper.class + private final Logger trace = Logger.getLogger(RabbitBaseOper.class .getCanonicalName()); public synchronized void initialize(OperatorContext context) @@ -95,63 +94,62 @@ public void shutdown() throws IOException, TimeoutException { public void initSchema(StreamSchema ss) throws Exception { Set supportedTypes = new HashSet(); + supportedTypes.add(MetaType.MAP); + messageHeaderAH.initialize(ss, false, supportedTypes); + supportedTypes.remove(MetaType.MAP); + supportedTypes.add(MetaType.RSTRING); supportedTypes.add(MetaType.USTRING); supportedTypes.add(MetaType.BLOB); - routingKeyAH.initialize(ss, true, supportedTypes); + routingKeyAH.initialize(ss, false, supportedTypes); messageAH.initialize(ss, true, supportedTypes); } - @Parameter(optional = false, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") + @Parameter(optional = false, description = "List of host and port in form: \\\"myhost1:3456\\\",\\\"myhost2:3456\\\".") public void setHostAndPort(List value) { hostAndPortList.addAll(value); } - @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") + @Parameter(optional = false, description = "Username for RabbitMQ authentication.") public void setUsername(String value) { username = value; } - @Parameter(optional = false, description = "Name of the attribute for the key. Default is \\\"key\\\".") + @Parameter(optional = false, description = "Password for RabbitMQ authentication.") public void setPassword(String value) { password = value; } - @Parameter(optional = true, description = "Exchange Name.") + @Parameter(optional = false, description = "Required attribute. Name of the RabbitMQ exchange.") public void setExchangeName(String value) { exchangeName = value; } - @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".") - public void setQueueName(String value) { - queueName = value; - } - - @Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".") + @Parameter(optional = true, description = "Name of the attribute for the message. Default is \\\"message\\\".") public void setMessageAttribute(String value) { messageAH.setName(value); } - @Parameter(optional = true, description = "Exchange Name.") + @Parameter(optional = true, description = "Name of the attribute for the routing_key. Default is \\\"routing_key\\\".") public void setRoutingKeyAttribute(String value) { routingKeyAH.setName(value); } + + @Parameter(optional = true, description = "Name of the attribute for the message_header. Schema of type must be Map. Default is \\\"message_header\\\".") + public void setMsgHeaderAttribute(String value) { + messageHeaderAH.setName(value); + } - @Parameter(optional = true, description = "Exchange Name.") + @Parameter(optional = true, description = "Set Virtual Host. Default is null.") public void setVirtualHost(String value) { vHost = value; } - @Parameter(optional = true, description = "Exchange Name.") + @Parameter(optional = true, description = "Have connections to RabbitMQ automatically recovered. Default is true.") public void setAutomaticRecovery(Boolean value) { autoRecovery = value; } - -// @Parameter(optional = true, description = "Exchange Name.") -// public void setAutomaticRecovery(Boolean value) { -// autoRecovery = value; -// } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index c6e4bb1..716374c 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -10,17 +10,18 @@ import java.util.Map; import java.util.concurrent.TimeoutException; -import org.slf4j.LoggerFactory; +import java.util.logging.Logger; import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.StreamingData.Punctuation; import com.ibm.streams.operator.StreamingInput; import com.ibm.streams.operator.Tuple; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.model.InputPortSet; import com.ibm.streams.operator.model.InputPorts; +import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.model.PrimitiveOperator; import com.rabbitmq.client.AMQP.BasicProperties; -import com.rabbitmq.client.AlreadyClosedException; /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali @@ -51,9 +52,11 @@ @PrimitiveOperator(name = "RabbitMQSink", description = "something") public class RabbitMQSink extends RabbitBaseOper { - private static final org.slf4j.Logger log = LoggerFactory - .getLogger(RabbitMQSink.class); - + private final Logger trace = Logger.getLogger(RabbitMQSink.class + .getCanonicalName()); + Integer deliveryMode = 1; + int maxMessageSendRetries = 0; + int messageSendRetryDelay = 10000; /** * Initialize this operator. Called once before any tuples are processed. * @@ -69,7 +72,7 @@ public synchronized void initialize(OperatorContext context) // Must call super.initialize(context) to correctly setup an operator. super.initialize(context); super.initSchema(getInput(0).getStreamSchema()); - log.trace("Operator " + context.getName() + " initializing in PE: " + trace.log(TraceLevel.INFO, "Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); @@ -88,7 +91,7 @@ public synchronized void allPortsReady() throws Exception { // Operators that process incoming tuples generally do not need this // notification. OperatorContext context = getOperatorContext(); - log.trace("Operator " + context.getName() + trace.log(TraceLevel.INFO, "Operator " + context.getName() + " all ports are ready in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); @@ -104,42 +107,78 @@ public synchronized void allPortsReady() throws Exception { * @throws Exception * Operator failure, will cause the enclosing PE to terminate. */ + @SuppressWarnings("unchecked") @Override - public void process(StreamingInput stream, Tuple tuple) + public void process(StreamingInput stream, Tuple tuple) throws Exception { String message = tuple.getString(messageAH.getName()); - String routingKey = tuple.getString(routingKeyAH.getName()); - + String routingKey = ""; + Map headers = new HashMap<>(); + + if (routingKeyAH.isAvailable()){ + routingKey = tuple.getString(routingKeyAH.getName()); + } + + BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); + + if (messageHeaderAH.isAvailable()){ + headers = (Map) tuple.getMap(messageHeaderAH.getName()); + propsBuilder.headers(headers); + } + propsBuilder.deliveryMode(deliveryMode); + try { - log.trace("Producing message: " + message + " in thread: " + trace.log(TraceLevel.TRACE, "Producing message: " + message + " in thread: " + Thread.currentThread().getName()); - BasicProperties.Builder propsBuilder = new BasicProperties.Builder(); - Map headers = new HashMap(); - headers.put("test", "myHeader"); - propsBuilder.headers(headers); channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), message.getBytes()); } catch (Exception e) { - log.trace("Exception message:" + e.getMessage() + "\r\n"); + trace.log(TraceLevel.ERROR, "Exception message:" + e.getMessage() + "\r\n"); + Boolean failedToSend = true; + int attemptCount = 0; + while (failedToSend + && attemptCount < maxMessageSendRetries){ + try { + Thread.sleep(messageSendRetryDelay); + trace.log(TraceLevel.ERROR, "Attempting to resend. Try number: " + attemptCount); + channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), + message.getBytes()); + failedToSend = false; + } catch (InterruptedException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + attemptCount++; + } + + //if we still can't send after the number of maxMessageSendRetries, we want to fail + if (failedToSend){ + trace.log(TraceLevel.ERROR, "Failed to send message after " + attemptCount + " attempts."); + } + } } - /** - * Process an incoming punctuation that arrived on the specified port. - * - * @param stream - * Port the punctuation is arriving on. - * @param mark - * The punctuation mark - * @throws Exception - * Operator failure, will cause the enclosing PE to terminate. - */ - @Override - public void processPunctuation(StreamingInput stream, - Punctuation mark) throws Exception { + @Parameter(optional = true, description = "Marks message as persistent(2) or non-persistent(1). Default as 1. ") + public void setDeliveryMode(Integer value) { + deliveryMode = value; + } + + @Parameter(optional = true, description = "This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted.") + public void setMaxMessageSendRetries(int value) { + maxMessageSendRetries = value; + } + + @Parameter(optional = true, description = "This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter.") + public void setMessageSendRetryDelay(int value) { + messageSendRetryDelay = value; } + /** * Shutdown this operator. * @@ -150,5 +189,6 @@ public void processPunctuation(StreamingInput stream, public synchronized void shutdown() throws IOException, TimeoutException { super.shutdown(); } + } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index 32804e6..b5c158d 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -29,8 +29,7 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; - -import org.slf4j.LoggerFactory; +import java.util.logging.Logger; /** * This operator was originally contributed by Mohamed-Ali Said @saidmohamedali @@ -62,13 +61,13 @@ public class RabbitMQSource extends RabbitBaseOper { private List routingKeys = new ArrayList(); - private static final org.slf4j.Logger log = LoggerFactory - .getLogger(RabbitMQSource.class); + private final Logger trace = Logger.getLogger(RabbitBaseOper.class + .getCanonicalName()); /** * Thread for calling produceTuples() to produce tuples */ private Thread processThread; - + private String queueName = ""; /** * Initialize this operator. Called once before any tuples are processed. * @@ -82,7 +81,7 @@ public synchronized void initialize(OperatorContext context) throws Exception { super.initialize(context); super.initSchema(getOutput(0).getStreamSchema()); - log.trace(this.getClass().getName() + "Operator " + context.getName() + trace.log(TraceLevel.INFO, this.getClass().getName() + "Operator " + context.getName() + " initializing in PE: " + context.getPE().getPEId() + " in Job: " + context.getPE().getJobId()); @@ -113,6 +112,9 @@ private void initRabbitChannel() throws IOException { if (queueName == "") { queueName = channel.queueDeclare().getQueue(); } + + if (routingKeys.isEmpty()) + routingKeys.add("");//receive all messages by default for (String routingKey : routingKeys){ channel.queueBind(queueName, exchangeName, routingKey); @@ -148,34 +150,34 @@ public void handleDelivery(String consumerTag, Envelope envelope, throws IOException { String message = new String(body, "UTF-8"); StreamingOutput out = getOutput(0); - Map msgHeader = properties.getHeaders(); - Map headers = new HashMap(); - Iterator> it = msgHeader.entrySet().iterator(); - while (it.hasNext()){ - Map.Entry pair = it.next(); - System.out.println("Header: " + pair.getKey() + ":" + pair.getValue().toString()); - headers.put(pair.getKey(), pair.getValue().toString()); - } OutputTuple tuple = out.newTuple(); System.out.println("Schema: " + tuple.getStreamSchema().getAttributeNames().toString()); + tuple.setString(messageAH.getName(), message); + if (routingKeyAH.isAvailable()) { tuple.setString(routingKeyAH.getName(), envelope.getRoutingKey()); System.out.println(routingKeyAH.getName() + ":" + envelope.getRoutingKey()); - } else { - System.out.println("What the hell?? " - + routingKeyAH.toString()); - } - tuple.setString(messageAH.getName(), message); - + } - - tuple.setMap("msgHeader", headers); + if (messageHeaderAH.isAvailable()){ + Map msgHeader = properties.getHeaders(); + if (msgHeader != null && !msgHeader.isEmpty()){ + Map headers = new HashMap(); + Iterator> it = msgHeader.entrySet().iterator(); + while (it.hasNext()){ + Map.Entry pair = it.next(); + System.out.println("Header: " + pair.getKey() + ":" + pair.getValue().toString()); + headers.put(pair.getKey(), pair.getValue().toString()); + } + tuple.setMap(messageHeaderAH.getName(), headers); + } + } System.out.println("message: " + message); // Submit tuple to output stream @@ -189,17 +191,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, }; channel.basicConsume(queueName, true, consumer); } - -// @Parameter(optional = true, description = "Exchange Name.") -// public void setRoutingKey(String value) { -// routingKey = value; -// } @Parameter(optional = true, description = "Exchange Name.") public void setRoutingKey(List values) { if(values!=null) routingKeys.addAll(values); } + + @Parameter(optional = true, description = "Name of the queue. Main reason to specify is to facilitate parallel consuming. Default is a random queue name..") + public void setQueueName(String value) { + queueName = value; + } /** * Shutdown this operator, which will interrupt the thread executing the From af7e51623f78cecb998640b0876d7d9f541c12ed Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Fri, 30 Oct 2015 15:33:58 -0400 Subject: [PATCH 11/12] Remove print statements and replace with trace. Need to come back to refactor logging. --- .../messaging/rabbitmq/RabbitBaseOper.java | 4 ++-- .../streamsx/messaging/rabbitmq/RabbitMQSink.java | 2 -- .../messaging/rabbitmq/RabbitMQSource.java | 15 +++++++-------- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java index 39ae943..8048644 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitBaseOper.java @@ -58,7 +58,7 @@ public synchronized void initialize(OperatorContext context) if (vHost != null) connectionFactory.setVirtualHost(vHost); addressArr = buildAddressArray(hostAndPortList); - System.out.println("Addr Array: " + addressArr[0].getHost() + ":" + addressArr[0].getPort()); + trace.log(TraceLevel.INFO, "Addr Array: " + addressArr[0].getHost() + ":" + addressArr[0].getPort()); connection = connectionFactory.newConnection(addressArr); channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, exchangeType); @@ -75,7 +75,7 @@ private Address[] buildAddressArray(List hostsAndPorts) throws Malformed for (String hostAndPort : hostsAndPorts){ URL tmpURL = new URL("http://" + hostAndPort); addrArr[i++] = new Address(tmpURL.getHost(), tmpURL.getPort()); - System.out.println("Adding: " + tmpURL.getHost() + ":"+ tmpURL.getPort()); + trace.log(TraceLevel.INFO, "Adding: " + tmpURL.getHost() + ":"+ tmpURL.getPort()); } trace.log(TraceLevel.INFO, "Built address array: \n" + addrArr.toString()); diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index 716374c..ba74891 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -9,11 +9,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; - import java.util.logging.Logger; import com.ibm.streams.operator.OperatorContext; -import com.ibm.streams.operator.StreamingData.Punctuation; import com.ibm.streams.operator.StreamingInput; import com.ibm.streams.operator.Tuple; import com.ibm.streams.operator.logging.TraceLevel; diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java index b5c158d..61c2d10 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource.java @@ -118,8 +118,7 @@ private void initRabbitChannel() throws IOException { for (String routingKey : routingKeys){ channel.queueBind(queueName, exchangeName, routingKey); - System.out - .println("Queue: " + queueName + " Exchange: " + exchangeName); + trace.log(TraceLevel.INFO, "Queue: " + queueName + " Exchange: " + exchangeName); } } @@ -142,7 +141,7 @@ public synchronized void allPortsReady() throws Exception { * if an error occurs while submitting a tuple */ private void produceTuples() throws Exception { - System.out.println("Producing tuples!"); + trace.log(TraceLevel.INFO, "Producing tuples!"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, @@ -154,14 +153,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, OutputTuple tuple = out.newTuple(); - System.out.println("Schema: " + tuple.getStreamSchema().getAttributeNames().toString()); + trace.log(TraceLevel.INFO, "Schema: " + tuple.getStreamSchema().getAttributeNames().toString()); tuple.setString(messageAH.getName(), message); if (routingKeyAH.isAvailable()) { tuple.setString(routingKeyAH.getName(), envelope.getRoutingKey()); - System.out.println(routingKeyAH.getName() + ":" + trace.log(TraceLevel.INFO, routingKeyAH.getName() + ":" + envelope.getRoutingKey()); } @@ -172,19 +171,19 @@ public void handleDelivery(String consumerTag, Envelope envelope, Iterator> it = msgHeader.entrySet().iterator(); while (it.hasNext()){ Map.Entry pair = it.next(); - System.out.println("Header: " + pair.getKey() + ":" + pair.getValue().toString()); + trace.log(TraceLevel.INFO, "Header: " + pair.getKey() + ":" + pair.getValue().toString()); headers.put(pair.getKey(), pair.getValue().toString()); } tuple.setMap(messageHeaderAH.getName(), headers); } } - System.out.println("message: " + message); + trace.log(TraceLevel.INFO, "message: " + message); // Submit tuple to output stream try { out.submit(tuple); } catch (Exception e) { - System.out.println("Catching submit exception"); + trace.log(TraceLevel.INFO, "Catching submit exception"); e.printStackTrace(); } } From 2ffd81eba1fc9e355c24b283fabacc894d4afa75 Mon Sep 17 00:00:00 2001 From: Alex Cook Date: Fri, 30 Oct 2015 17:46:06 -0400 Subject: [PATCH 12/12] Parameterized call. --- .../src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java index ba74891..67d9339 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink.java @@ -112,7 +112,7 @@ public void process(StreamingInput stream, Tuple tuple) throws Exception String message = tuple.getString(messageAH.getName()); String routingKey = ""; - Map headers = new HashMap<>(); + Map headers = new HashMap(); if (routingKeyAH.isAvailable()){ routingKey = tuple.getString(routingKeyAH.getName());