diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/DataGovernanceUtil.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/DataGovernanceUtil.java new file mode 100644 index 0000000..98fcc03 --- /dev/null +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/DataGovernanceUtil.java @@ -0,0 +1,30 @@ +package com.ibm.streamsx.messaging.common; + +import java.util.HashMap; +import java.util.Map; + +import com.ibm.streams.operator.AbstractOperator; + +public class DataGovernanceUtil { + + public static void registerForDataGovernance(AbstractOperator operator, String assetName, String assetType, String parentAssetName, String parentAssetType, boolean isInput, String operatorType) { + Map properties = new HashMap(); + if(isInput) { + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT); + properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, operatorType); + } else { + properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT); + properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, operatorType); + } + properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, assetName); + properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + assetType); + if(parentAssetName != null) { + properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, IGovernanceConstants.PROPERTY_PARENT_PREFIX); + properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_NAME, parentAssetName); + properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + parentAssetType); + properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_PARENT_TYPE, "$" + parentAssetType); + } + + operator.setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties); + } +} diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/IGovernanceConstants.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/IGovernanceConstants.java new file mode 100644 index 0000000..c797546 --- /dev/null +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/IGovernanceConstants.java @@ -0,0 +1,31 @@ +package com.ibm.streamsx.messaging.common; + +public interface IGovernanceConstants { + public static final String TAG_OPERATOR_IGC = "OperatorIGC"; + public static final String TAG_REGISTER_TYPE = "registerType"; + public static final String TAG_REGISTER_TYPE_INPUT = "input"; + public static final String TAG_REGISTER_TYPE_OUTPUT = "output"; + + public static final String ASSET_STREAMS_PREFIX = "$Streams-"; + + public static final String ASSET_JMS_SERVER_TYPE = "JMSServer"; + + public static final String ASSET_JMS_MESSAGE_TYPE = "JMS"; + + public static final String ASSET_KAFKA_TOPIC_TYPE = "KafkaTopic"; + + public static final String ASSET_MQTT_TOPIC_TYPE = "MQTT"; + public static final String ASSET_MQTT_SERVER_TYPE = "MQServer"; + + public static final String PROPERTY_SRC_NAME = "srcName"; + public static final String PROPERTY_SRC_TYPE = "srcType"; + + public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent"; + public static final String PROPERTY_PARENT_TYPE = "parentType"; + + public static final String PROPERTY_PARENT_PREFIX = "p1"; + + public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType"; + public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType"; + +} diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 507d00c..652865f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -29,12 +29,15 @@ import com.ibm.streams.operator.compile.OperatorContextChecker; import com.ibm.streams.operator.logging.LogLevel; import com.ibm.streams.operator.logging.LoggerNames; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.metrics.Metric; import com.ibm.streams.operator.model.CustomMetric; import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.state.Checkpoint; import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.state.StateHandler; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; //The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic. @@ -553,6 +556,16 @@ public synchronized void initialize(OperatorContext context) throw new RuntimeException("No valid message class is specified."); } + // register for data governance + registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination()); + + } + + private void registerForDataGovernance(String providerURL, String destination) { + logger.log(TraceLevel.INFO, "JMSSink - Registering for data governance with providerURL: " + providerURL + + " destination: " + destination); + DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE, + providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, false, "JMSSink"); } @Override diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 7a7bc9e..1ea3c98 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -22,14 +22,15 @@ import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.OperatorContext.ContextCheck; -import com.ibm.streams.operator.Type.MetaType; import com.ibm.streams.operator.OutputTuple; import com.ibm.streams.operator.StreamSchema; import com.ibm.streams.operator.StreamingOutput; import com.ibm.streams.operator.Type; +import com.ibm.streams.operator.Type.MetaType; import com.ibm.streams.operator.compile.OperatorContextChecker; import com.ibm.streams.operator.logging.LogLevel; import com.ibm.streams.operator.logging.LoggerNames; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.metrics.Metric; import com.ibm.streams.operator.model.CustomMetric; import com.ibm.streams.operator.model.Parameter; @@ -38,6 +39,8 @@ import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.state.StateHandler; import com.ibm.streams.operator.types.RString; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; //The JMSSource operator converts a message JMS queue or topic to stream public class JMSSource extends ProcessTupleProducer implements StateHandler{ @@ -551,6 +554,17 @@ public synchronized void initialize(OperatorContext context) default: throw new RuntimeException("No valid message class is specified."); } + + // register for data governance + registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination()); + + } + + private void registerForDataGovernance(String providerURL, String destination) { + logger.log(TraceLevel.INFO, "JMSSource - Registering for data governance with providerURL: " + providerURL + + " destination: " + destination); + DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE, + providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, true, "JMSSource"); } @Override diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java index d0f33a0..ec5934c 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java @@ -20,6 +20,8 @@ import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.model.PrimitiveOperator; import com.ibm.streams.operator.state.ConsistentRegionContext; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; @InputPorts(@InputPortSet(cardinality=1, optional=false, description="The tuples arriving on this port are expected to contain three attributes \\\"key\\\", \\\"topic\\\" and \\\"message\\\". " + @@ -81,6 +83,24 @@ public void initialize(OperatorContext context) //TODO: check for minimum properties trace.log(TraceLevel.INFO, "Initializing producer"); client.initProducer(); + + // register for data governance + // only register user specified topic in param + registerForDataGovernance(); + } + + private void registerForDataGovernance() { + trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance"); + + if (!topics.isEmpty()) { + for (String topic : topics) { + trace.log(TraceLevel.INFO, OPER_NAME + " -- data governance - topic to register: " + topic); + DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE, + null, null, false, "KafkaSink"); + } + } else { + trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance -- topics is empty"); + } } @Override diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java index 1c4bef4..7fd4a94 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java @@ -7,21 +7,23 @@ import java.io.IOException; -import java.util.List; -import java.util.logging.Logger; - -import com.ibm.streams.operator.OperatorContext; +import java.util.List; +import java.util.logging.Logger; + +import com.ibm.streams.operator.OperatorContext; import com.ibm.streams.operator.OperatorContext.ContextCheck; import com.ibm.streams.operator.compile.OperatorContextChecker; -import com.ibm.streams.operator.logging.TraceLevel; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.model.Icons; -import com.ibm.streams.operator.model.OutputPortSet; -import com.ibm.streams.operator.model.OutputPorts; -import com.ibm.streams.operator.model.Parameter; -import com.ibm.streams.operator.model.PrimitiveOperator; +import com.ibm.streams.operator.model.OutputPortSet; +import com.ibm.streams.operator.model.OutputPorts; +import com.ibm.streams.operator.model.Parameter; +import com.ibm.streams.operator.model.PrimitiveOperator; import com.ibm.streams.operator.state.Checkpoint; import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.state.StateHandler; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; @OutputPorts(@OutputPortSet(cardinality=1, optional=false, description="Messages received from Kafka are sent on this output port.")) @@ -100,6 +102,23 @@ public void initialize(OperatorContext context) } } + + // register for data governance + registerForDataGovernance(); + + } + + private void registerForDataGovernance() { + trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance"); + if (topics != null) { + for (String topic : topics) { + trace.log(TraceLevel.INFO, "KafkaSource - data governance - topic: " + topic); + DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE, + null, null, true, "KafkaSource"); + } + } else { + trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance -- topics is empty"); + } } @Override diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java index 1755044..594e378 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java @@ -50,6 +50,8 @@ import com.ibm.streams.operator.state.StateHandler; import com.ibm.streams.operator.types.Blob; import com.ibm.streams.operator.types.RString; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; /** * Class for an operator that consumes tuples and does not produce an output stream. @@ -461,7 +463,44 @@ public synchronized void initialize(OperatorContext context) initRelaunching(context); // do not connect here... connection is done on the publish thread when a message // is ready to be published - } + + // register for data governance + // if static topic, then register topic, else only register the server + if (topicAttributeName == null) { + registerForDataGovernance(); + } else { + // register the "server" for governance + registerServerForDataGovernance(); + } + } + + private void registerForDataGovernance() { + String uri = getServerUri(); + String topic = getTopics(); + TRACE.log(TraceLevel.INFO, + "MQTTSink - Registering for data governance with server uri: " + uri + " and topic: " + topic); + + if (topic != null && !topic.isEmpty() && uri != null && !uri.isEmpty()) { + DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri, + IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, false, "MQTTSink"); + } else { + TRACE.log(TraceLevel.INFO, + "MQTTSink - Registering for data governance -- aborted. topic and/or url is null"); + } + } + + private void registerServerForDataGovernance() { + String uri = getServerUri(); + TRACE.log(TraceLevel.INFO, "MQTTSource - Registering only server for data governance with server uri: " + uri); + + if (uri != null && !uri.isEmpty()) { + DataGovernanceUtil.registerForDataGovernance(this, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, null, + null, false, "MQTTSink"); + } else { + TRACE.log(TraceLevel.INFO, + "MQTTSource - Registering only server for data governance -- aborted. uri is null"); + } + } /** * Notification that initialization is complete and all input and output ports diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java index 1629db1..2a35c3a 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java @@ -47,6 +47,8 @@ import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.types.RString; import com.ibm.streams.operator.types.ValueFactory; +import com.ibm.streamsx.messaging.common.DataGovernanceUtil; +import com.ibm.streamsx.messaging.common.IGovernanceConstants; import com.ibm.streamsx.messaging.mqtt.MqttClientRequest.MqttClientRequestType; /** @@ -306,6 +308,9 @@ public synchronized void initialize(OperatorContext context) mqttWrapper.setClientID(getClientID()); mqttWrapper.setCommandTimeout(getCommandTimeout()); mqttWrapper.setKeepAliveInterval(getKeepAliveInterval()); + + // register for data governance + registerForDataGovernance(); /* * Create the thread for producing tuples. @@ -356,6 +361,21 @@ public void run() { clientRequestThread.setDaemon(true); } + private void registerForDataGovernance() { + String uri = getServerUri(); + List topics = getTopics(); + TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topics: " + topics.toArray().toString()); + + if(topics != null && uri != null && !uri.isEmpty()) { + for (String topic : topics) { + TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topic: " + topic); + DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, true, "MQTTSource"); + } + } else { + TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance -- aborted. topic and/or uri is null"); + } + } + protected void handleClientRequests() { while (!shutdown) { diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 3b2762e..b038f7f 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -681,7 +681,7 @@ The <attribute> element has three possible attributes: * xml 3.0.0 - 4.0.0.0 + 4.1.0.0