Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring in changes on data governance for messaging toolkit #162

Merged
merged 4 commits into from
Nov 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.ibm.streamsx.messaging.common;

import java.util.HashMap;
import java.util.Map;

import com.ibm.streams.operator.AbstractOperator;

public class DataGovernanceUtil {

public static void registerForDataGovernance(AbstractOperator operator, String assetName, String assetType, String parentAssetName, String parentAssetType, boolean isInput, String operatorType) {
Map<String, String> properties = new HashMap<String, String>();
if(isInput) {
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_INPUT);
properties.put(IGovernanceConstants.PROPERTY_INPUT_OPERATOR_TYPE, operatorType);
} else {
properties.put(IGovernanceConstants.TAG_REGISTER_TYPE, IGovernanceConstants.TAG_REGISTER_TYPE_OUTPUT);
properties.put(IGovernanceConstants.PROPERTY_OUTPUT_OPERATOR_TYPE, operatorType);
}
properties.put(IGovernanceConstants.PROPERTY_SRC_NAME, assetName);
properties.put(IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + assetType);
if(parentAssetName != null) {
properties.put(IGovernanceConstants.PROPERTY_SRC_PARENT_PREFIX, IGovernanceConstants.PROPERTY_PARENT_PREFIX);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_NAME, parentAssetName);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_SRC_TYPE, IGovernanceConstants.ASSET_STREAMS_PREFIX + parentAssetType);
properties.put(IGovernanceConstants.PROPERTY_PARENT_PREFIX + IGovernanceConstants.PROPERTY_PARENT_TYPE, "$" + parentAssetType);
}

operator.setTagData(IGovernanceConstants.TAG_OPERATOR_IGC, properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ibm.streamsx.messaging.common;

public interface IGovernanceConstants {
public static final String TAG_OPERATOR_IGC = "OperatorIGC";
public static final String TAG_REGISTER_TYPE = "registerType";
public static final String TAG_REGISTER_TYPE_INPUT = "input";
public static final String TAG_REGISTER_TYPE_OUTPUT = "output";

public static final String ASSET_STREAMS_PREFIX = "$Streams-";

public static final String ASSET_JMS_SERVER_TYPE = "JMSServer";

public static final String ASSET_JMS_MESSAGE_TYPE = "JMS";

public static final String ASSET_KAFKA_TOPIC_TYPE = "KafkaTopic";

public static final String ASSET_MQTT_TOPIC_TYPE = "MQTT";
public static final String ASSET_MQTT_SERVER_TYPE = "MQServer";

public static final String PROPERTY_SRC_NAME = "srcName";
public static final String PROPERTY_SRC_TYPE = "srcType";

public static final String PROPERTY_SRC_PARENT_PREFIX = "srcParent";
public static final String PROPERTY_PARENT_TYPE = "parentType";

public static final String PROPERTY_PARENT_PREFIX = "p1";

public static final String PROPERTY_INPUT_OPERATOR_TYPE = "inputOperatorType";
public static final String PROPERTY_OUTPUT_OPERATOR_TYPE = "outputOperatorType";

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.logging.LoggerNames;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streams.operator.model.CustomMetric;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;


//The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic.
Expand Down Expand Up @@ -553,6 +556,16 @@ public synchronized void initialize(OperatorContext context)
throw new RuntimeException("No valid message class is specified.");
}

// register for data governance
registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination());

}

private void registerForDataGovernance(String providerURL, String destination) {
logger.log(TraceLevel.INFO, "JMSSink - Registering for data governance with providerURL: " + providerURL
+ " destination: " + destination);
DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE,
providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, false, "JMSSink");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.OutputTuple;
import com.ibm.streams.operator.StreamSchema;
import com.ibm.streams.operator.StreamingOutput;
import com.ibm.streams.operator.Type;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.logging.LoggerNames;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streams.operator.model.CustomMetric;
import com.ibm.streams.operator.model.Parameter;
Expand All @@ -38,6 +39,8 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streams.operator.types.RString;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

//The JMSSource operator converts a message JMS queue or topic to stream
public class JMSSource extends ProcessTupleProducer implements StateHandler{
Expand Down Expand Up @@ -551,6 +554,17 @@ public synchronized void initialize(OperatorContext context)
default:
throw new RuntimeException("No valid message class is specified.");
}

// register for data governance
registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination());

}

private void registerForDataGovernance(String providerURL, String destination) {
logger.log(TraceLevel.INFO, "JMSSource - Registering for data governance with providerURL: " + providerURL
+ " destination: " + destination);
DataGovernanceUtil.registerForDataGovernance(this, destination, IGovernanceConstants.ASSET_JMS_MESSAGE_TYPE,
providerURL, IGovernanceConstants.ASSET_JMS_SERVER_TYPE, true, "JMSSource");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

@InputPorts(@InputPortSet(cardinality=1, optional=false,
description="The tuples arriving on this port are expected to contain three attributes \\\"key\\\", \\\"topic\\\" and \\\"message\\\". " +
Expand Down Expand Up @@ -81,6 +83,24 @@ public void initialize(OperatorContext context)
//TODO: check for minimum properties
trace.log(TraceLevel.INFO, "Initializing producer");
client.initProducer();

// register for data governance
// only register user specified topic in param
registerForDataGovernance();
}

private void registerForDataGovernance() {
trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance");

if (!topics.isEmpty()) {
for (String topic : topics) {
trace.log(TraceLevel.INFO, OPER_NAME + " -- data governance - topic to register: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE,
null, null, false, "KafkaSink");
}
} else {
trace.log(TraceLevel.INFO, "KafkaSink -- Registering for data governance -- topics is empty");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@


import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;

import com.ibm.streams.operator.OperatorContext;
import java.util.List;
import java.util.logging.Logger;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.OperatorContext.ContextCheck;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.model.Icons;
import com.ibm.streams.operator.model.OutputPortSet;
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.model.OutputPortSet;
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

@OutputPorts(@OutputPortSet(cardinality=1, optional=false,
description="Messages received from Kafka are sent on this output port."))
Expand Down Expand Up @@ -100,6 +102,23 @@ public void initialize(OperatorContext context)
}

}

// register for data governance
registerForDataGovernance();

}

private void registerForDataGovernance() {
trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance");
if (topics != null) {
for (String topic : topics) {
trace.log(TraceLevel.INFO, "KafkaSource - data governance - topic: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_KAFKA_TOPIC_TYPE,
null, null, true, "KafkaSource");
}
} else {
trace.log(TraceLevel.INFO, "KafkaSource - Registering for data governance -- topics is empty");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streams.operator.types.Blob;
import com.ibm.streams.operator.types.RString;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;

/**
* Class for an operator that consumes tuples and does not produce an output stream.
Expand Down Expand Up @@ -461,7 +463,44 @@ public synchronized void initialize(OperatorContext context)
initRelaunching(context);
// do not connect here... connection is done on the publish thread when a message
// is ready to be published
}

// register for data governance
// if static topic, then register topic, else only register the server
if (topicAttributeName == null) {
registerForDataGovernance();
} else {
// register the "server" for governance
registerServerForDataGovernance();
}
}

private void registerForDataGovernance() {
String uri = getServerUri();
String topic = getTopics();
TRACE.log(TraceLevel.INFO,
"MQTTSink - Registering for data governance with server uri: " + uri + " and topic: " + topic);

if (topic != null && !topic.isEmpty() && uri != null && !uri.isEmpty()) {
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri,
IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, false, "MQTTSink");
} else {
TRACE.log(TraceLevel.INFO,
"MQTTSink - Registering for data governance -- aborted. topic and/or url is null");
}
}

private void registerServerForDataGovernance() {
String uri = getServerUri();
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering only server for data governance with server uri: " + uri);

if (uri != null && !uri.isEmpty()) {
DataGovernanceUtil.registerForDataGovernance(this, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, null,
null, false, "MQTTSink");
} else {
TRACE.log(TraceLevel.INFO,
"MQTTSource - Registering only server for data governance -- aborted. uri is null");
}
}

/**
* Notification that initialization is complete and all input and output ports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.types.RString;
import com.ibm.streams.operator.types.ValueFactory;
import com.ibm.streamsx.messaging.common.DataGovernanceUtil;
import com.ibm.streamsx.messaging.common.IGovernanceConstants;
import com.ibm.streamsx.messaging.mqtt.MqttClientRequest.MqttClientRequestType;

/**
Expand Down Expand Up @@ -306,6 +308,9 @@ public synchronized void initialize(OperatorContext context)
mqttWrapper.setClientID(getClientID());
mqttWrapper.setCommandTimeout(getCommandTimeout());
mqttWrapper.setKeepAliveInterval(getKeepAliveInterval());

// register for data governance
registerForDataGovernance();

/*
* Create the thread for producing tuples.
Expand Down Expand Up @@ -356,6 +361,21 @@ public void run() {
clientRequestThread.setDaemon(true);
}

private void registerForDataGovernance() {
String uri = getServerUri();
List<String> topics = getTopics();
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topics: " + topics.toArray().toString());

if(topics != null && uri != null && !uri.isEmpty()) {
for (String topic : topics) {
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance with server uri: " + uri + " and topic: " + topic);
DataGovernanceUtil.registerForDataGovernance(this, topic, IGovernanceConstants.ASSET_MQTT_TOPIC_TYPE, uri, IGovernanceConstants.ASSET_MQTT_SERVER_TYPE, true, "MQTTSource");
}
} else {
TRACE.log(TraceLevel.INFO, "MQTTSource - Registering for data governance -- aborted. topic and/or uri is null");
}
}

protected void handleClientRequests() {
while (!shutdown)
{
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ The &lt;attribute&gt; element has three possible attributes:
* xml
</info:description>
<info:version>3.0.0</info:version>
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
<info:requiredProductVersion>4.1.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
<info:resources>
Expand Down