Skip to content

Commit

Permalink
Merge pull request #151 from Alex-Cook4/rabbitmq
Browse files Browse the repository at this point in the history
Rabbitmq
  • Loading branch information
Alex-Cook4 committed Oct 30, 2015
2 parents da44433 + 2ffd81e commit bec2e71
Show file tree
Hide file tree
Showing 8 changed files with 713 additions and 459 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?><operatorModel xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator" xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
<!--DO NOT EDIT THIS FILE - it is machine generated-->
<javaOperatorModel>
<!--Generated from com.ibm.streamsx.messaging.rabbitmq.RabbitMQSink in impl/java/bin/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink$StreamsModel.class at Fri Oct 23 09:32:52 EDT 2015-->
<!--Generated from com.ibm.streamsx.messaging.rabbitmq.RabbitMQSink in impl/java/bin/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSink$StreamsModel.class at Fri Oct 30 15:26:15 EDT 2015-->
<context>
<description>something</description>
<metrics/>
Expand All @@ -20,43 +20,85 @@
</context>
<parameters>
<parameter>
<name>exchangeName</name>
<description>Exchange Name.</description>
<name>automaticRecovery</name>
<description>Have connections to RabbitMQ automatically recovered. Default is true.</description>
<optional>true</optional>
<type>rstring</type>
<type>boolean</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>hostname</name>
<description>Exchange Name.</description>
<name>deliveryMode</name>
<description>Marks message as persistent(2) or non-persistent(1). Default as 1. </description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>exchangeName</name>
<description>Required attribute. Name of the RabbitMQ exchange.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>password</name>
<description>Exchange Name.</description>
<name>hostAndPort</name>
<description>List of host and port in form: "myhost1:3456","myhost2:3456".</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>-1</cardinality>
</parameter>
<parameter>
<name>maxMessageSendRetries</name>
<description>This optional parameter specifies the number of successive retries that are attempted for a message if a failure occurs when the message is sent. The default value is zero; no retries are attempted.</description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>messageAttribute</name>
<description>Name of the attribute for the message. Default is "message".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>portId</name>
<description>Exchange Name.</description>
<name>messageSendRetryDelay</name>
<description>This optional parameter specifies the time in milliseconds to wait before the next delivery attempt. If the maxMessageSendRetries is specified, you must also specify a value for this parameter.</description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>routingKey</name>
<description>Exchange Name.</description>
<name>msgHeaderAttribute</name>
<description>Name of the attribute for the message_header. Schema of type must be Map&lt;ustring,ustring&gt;. Default is "message_header".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>password</name>
<description>Password for RabbitMQ authentication.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>routingKeyAttribute</name>
<description>Name of the attribute for the routing_key. Default is "routing_key".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>username</name>
<description>Exchange Name.</description>
<description>Username for RabbitMQ authentication.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>virtualHost</name>
<description>Set Virtual Host. Default is null.</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?><operatorModel xmlns="http://www.ibm.com/xmlns/prod/streams/spl/operator" xmlns:cmn="http://www.ibm.com/xmlns/prod/streams/spl/common" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ibm.com/xmlns/prod/streams/spl/operator operatorModel.xsd">
<!--DO NOT EDIT THIS FILE - it is machine generated-->
<javaOperatorModel>
<!--Generated from com.ibm.streamsx.messaging.rabbitmq.RabbitMQSource in impl/java/bin/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource$StreamsModel.class at Fri Oct 23 09:32:52 EDT 2015-->
<!--Generated from com.ibm.streamsx.messaging.rabbitmq.RabbitMQSource in impl/java/bin/com/ibm/streamsx/messaging/rabbitmq/RabbitMQSource$StreamsModel.class at Fri Oct 30 15:26:15 EDT 2015-->
<context>
<description>something</description>
<metrics/>
Expand All @@ -19,44 +19,79 @@
</libraryDependencies>
</context>
<parameters>
<parameter>
<name>automaticRecovery</name>
<description>Have connections to RabbitMQ automatically recovered. Default is true.</description>
<optional>true</optional>
<type>boolean</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>exchangeName</name>
<description>Exchange Name.</description>
<description>Required attribute. Name of the RabbitMQ exchange.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>hostAndPort</name>
<description>List of host and port in form: "myhost1:3456","myhost2:3456".</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>-1</cardinality>
</parameter>
<parameter>
<name>messageAttribute</name>
<description>Name of the attribute for the message. Default is "message".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>hostname</name>
<description>Exchange Name.</description>
<name>msgHeaderAttribute</name>
<description>Name of the attribute for the message_header. Schema of type must be Map&lt;ustring,ustring&gt;. Default is "message_header".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>password</name>
<description>Exchange Name.</description>
<optional>true</optional>
<description>Password for RabbitMQ authentication.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>portId</name>
<description>Exchange Name.</description>
<name>queueName</name>
<description>Name of the queue. Main reason to specify is to facilitate parallel consuming. Default is a random queue name..</description>
<optional>true</optional>
<type>int32</type>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>routingKey</name>
<description>Exchange Name.</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>-1</cardinality>
</parameter>
<parameter>
<name>routingKeyAttribute</name>
<description>Name of the attribute for the routing_key. Default is "routing_key".</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>username</name>
<description>Exchange Name.</description>
<description>Username for RabbitMQ authentication.</description>
<optional>false</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>virtualHost</name>
<description>Set Virtual Host. Default is null.</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*******************************************************************************
* Copyright (C) 2015, International Business Machines Corporation
* All Rights Reserved
*******************************************************************************/
package com.ibm.streamsx.messaging.rabbitmq;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Set;

import com.ibm.streams.operator.Attribute;
import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.types.Blob;
import com.ibm.streams.operator.types.ValueFactory;

//Helper to check if attributes have been specified explicitly
class AttributeHelper {
static final Charset CS = Charset.forName("UTF-8");
private boolean wasSet = false, isAvailable = false;
private MetaType mType = null;
private String name = null;
private boolean isString = false;

AttributeHelper(String n) {
this.name = n;
}

boolean isWasSet() {
return wasSet;
}

boolean isAvailable() {
return isAvailable;
}

String getName() {
return name;
}

void setName(String name) {
this.name = name;
wasSet = true;
}

void initialize(StreamSchema ss, boolean required, Set<MetaType> supportedTypes) throws Exception {
Attribute a = ss.getAttribute(name);
if(a == null) {
if(wasSet)
throw new IllegalArgumentException("Attribute \"" + name + "\" not available.");
if(required)
throw new IllegalArgumentException("Attribute not found for \"" + name + "\".");
return;
}
this.mType = a.getType().getMetaType();
isString = mType == MetaType.RSTRING || mType == MetaType.USTRING;

if(!supportedTypes.contains(mType)){
throw new Exception("Attribute \"" + name + "\" must be one of: " + supportedTypes);
}
isAvailable = true;
}
boolean isString() {
return isString;
}

void setValue(OutputTuple otup, String value) {
if(!isAvailable) return;
if(isString) {
if (value == null)
value = "";
otup.setString(name, value);
} else
otup.setBlob(name, ValueFactory.newBlob(value.getBytes(CS)));
}
void setValue(OutputTuple otup, byte[] value) {
if(!isAvailable) return;
if(isString) {
if (value == null)
otup.setString(name,"");
else
otup.setString(name, new String(value, CS));
}
else
otup.setBlob(name, ValueFactory.newBlob(value));
}

String getString(Tuple tuple) throws IOException {
if(!isAvailable) return null;
if(isString)
return tuple.getString(name);
return new String(getBytesFromBlob(tuple, name));
}
byte[] getBytes(Tuple tuple) throws IOException {
if(!isAvailable) return null;
if(isString)
return tuple.getString(name).getBytes(CS);
return getBytesFromBlob(tuple, name);
}
private static byte[] getBytesFromBlob(Tuple tuple, String name) throws IOException {
Blob blockMsg = tuple.getBlob(name);
InputStream inputStream = blockMsg.getInputStream();
int length = (int) blockMsg.getLength();
byte[] byteArray = new byte[length];
inputStream.read(byteArray, 0, length);
return byteArray;
}
}
Loading

0 comments on commit bec2e71

Please sign in to comment.