Skip to content

Commit

Permalink
Merge pull request #105 from Cinimex-Informatica/develop
Browse files Browse the repository at this point in the history
Release 0.2.1-rc has been tested. No bugs have been found. Merging into the master branch to release 0.2.1-alfa.
  • Loading branch information
sberdyshev committed Mar 29, 2019
2 parents d070dcd + 0caec3d commit d0ee2d4
Show file tree
Hide file tree
Showing 19 changed files with 953 additions and 591 deletions.
475 changes: 379 additions & 96 deletions README.md

Large diffs are not rendered by default.

Binary file added docs/images/metric_naming_example_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/metric_naming_example_2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 4 additions & 3 deletions src/main/java/ru/cinimex/exporter/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;

/**
* Class is used for parsing config file.
Expand Down Expand Up @@ -81,11 +82,11 @@ public boolean usePCFWildcards() {
return usePCFWildcards;
}

public ArrayList<String> getChannels() {
public List<String> getChannels() {
return channels;
}

public ArrayList<String> getListeners() {
public List<String> getListeners() {
return listeners;
}

Expand Down Expand Up @@ -121,7 +122,7 @@ public String getEndpURL() {
return endpURL;
}

public ArrayList<String> getQueues() {
public List<String> getQueues() {
return queues;
}

Expand Down
21 changes: 11 additions & 10 deletions src/main/java/ru/cinimex/exporter/ExporterLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

/**
* Main class of mq exporter tool. Parses config, scans topics, starts subscribers.
*/
public class ExporterLauncher {
private static final Logger logger = LogManager.getLogger(ExporterLauncher.class);
private static final String topicString = "$SYS/MQ/INFO/QMGR/%s/Monitor/METADATA/CLASSES";
private static final int getMsgOpt = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG | MQConstants.MQGMO_SYNCPOINT;
private static final String TOPIC_STRING = "$SYS/MQ/INFO/QMGR/%s/Monitor/METADATA/CLASSES";
private static final int GMO = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_COMPLETE_MSG | MQConstants.MQGMO_SYNCPOINT;

public static void main(String[] args) {
if (args.length == 0) {
Expand All @@ -42,19 +43,19 @@ public static void main(String[] args) {
ArrayList<MQObject.MQType> monitoringTypes = new ArrayList<>();
ArrayList<MQObject> objects = new ArrayList<>();

if (config.getQueues() != null && config.getQueues().size() > 0) {
if (config.getQueues() != null && !config.getQueues().isEmpty()) {
monitoringTypes.add(MQObject.MQType.QUEUE);
for (String queueName : config.getQueues()) {
objects.add(new MQObject(queueName, MQObject.MQType.QUEUE));
}
}
if (config.getChannels() != null && config.getChannels().size() > 0) {
if (config.getChannels() != null && !config.getChannels().isEmpty()) {
monitoringTypes.add(MQObject.MQType.CHANNEL);
for (String channelName : config.getChannels()) {
objects.add(new MQObject(channelName, MQObject.MQType.CHANNEL));
}
}
if (config.getListeners() != null && config.getListeners().size() > 0) {
if (config.getListeners() != null && !config.getListeners().isEmpty()) {
monitoringTypes.add(MQObject.MQType.LISTENER);
for (String listenerName : config.getListeners()) {
objects.add(new MQObject(listenerName, MQObject.MQType.LISTENER));
Expand All @@ -80,23 +81,23 @@ public static void main(String[] args) {
private static ArrayList<PCFElement> getAllPublishedMetrics(Config config) {
MQConnection connection = new MQConnection();
MQTopic topic = null;
ArrayList<PCFElement> elements = new ArrayList<PCFElement>();
ArrayList<PCFElement> elements = new ArrayList<>();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = getMsgOpt;
gmo.options = GMO;
gmo.waitInterval = 30000;
try {
connection.establish(config.getQmgrHost(), config.getQmgrPort(), config.getQmgrChannel(), config.getQmgrName(), config.getUser(), config.getPassword(), config.useMqscp());
topic = connection.createTopic(String.format(topicString, config.getQmgrName()));
topic = connection.createTopic(String.format(TOPIC_STRING, config.getQmgrName()));
MQMessage msg = getEmptyMessage();
topic.get(msg, gmo);
PCFMessage pcfResponse = new PCFMessage(msg);
ArrayList<PCFClass> classes = PCFDataParser.getPCFClasses(pcfResponse);
List<PCFClass> classes = PCFDataParser.getPCFClasses(pcfResponse);
for (PCFClass pcfClass : classes) {
topic = connection.createTopic(pcfClass.getTopicString());
msg = getEmptyMessage();
topic.get(msg, gmo);
pcfResponse = new PCFMessage(msg);
ArrayList<PCFType> types = PCFDataParser.getPCFTypes(pcfResponse);
List<PCFType> types = PCFDataParser.getPCFTypes(pcfResponse);
for (PCFType type : types) {
topic = connection.createTopic(type.getTopicString());
msg = getEmptyMessage();
Expand Down
13 changes: 2 additions & 11 deletions src/main/java/ru/cinimex/exporter/mq/MQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,8 @@
*/
public class MQConnection {
private static final Logger logger = LogManager.getLogger(MQConnection.class);
private Hashtable<String, Object> connectionProperties;
private MQQueueManager queueManager;

/**
* Default constructor.
*/
public MQConnection() {

}

/**
* Method creates connection properties Hashtable from connection parameters.
*
Expand All @@ -37,7 +29,7 @@ public MQConnection() {
* @return - returns prepared structure with all parameters transformed into queue manager's format.
*/
protected static Hashtable<String, Object> createMQConnectionParams(String host, int port, String channel, String user, String password, boolean useMQCSP) {
Hashtable<String, Object> properties = new Hashtable<String, Object>();
Hashtable<String, Object> properties = new Hashtable<>();
properties.put(MQConstants.TRANSPORT_PROPERTY, host == null ? MQConstants.TRANSPORT_MQSERIES_BINDINGS : MQConstants.TRANSPORT_MQSERIES_CLIENT);
if (host != null) properties.put(MQConstants.HOST_NAME_PROPERTY, host);
if (port != 0) properties.put(MQConstants.PORT_PROPERTY, port);
Expand All @@ -62,11 +54,10 @@ protected static Hashtable<String, Object> createMQConnectionParams(String host,
* @param useMQCSP - flag, which indicates, if MQCSP auth should be used.
*/
public void establish(String host, int port, String channel, String qmName, String user, String password, boolean useMQCSP) throws MQException {
connectionProperties = createMQConnectionParams(host, port, channel, user, password, useMQCSP);
Hashtable<String, Object> connectionProperties = createMQConnectionParams(host, port, channel, user, password, useMQCSP);
queueManager = new MQQueueManager(qmName, connectionProperties);
}


/**
* Method establishes connection with queue manager.
*
Expand Down
34 changes: 16 additions & 18 deletions src/main/java/ru/cinimex/exporter/mq/MQObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public class MQObject {
private static final Logger logger = LogManager.getLogger(MQObject.class);
private String name;
private MQType type;
private PCFMessage PCFCmd;
private int PCFHeader;
private PCFMessage pcfCmd;
private int pcfHeader;

/**
* MQObject constructor.
Expand All @@ -29,26 +29,24 @@ public MQObject(String name, MQType type) {
*/
switch (type) {
case QUEUE:
PCFCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_Q); //if object type is queue, exporter would inquire it.
PCFCmd.addParameter(MQConstants.MQCA_Q_NAME, name); //PCF command would try to retrieve statistics about queue with specific name
PCFCmd.addParameter(MQConstants.MQIA_Q_TYPE, MQConstants.MQQT_LOCAL); // and specific type
PCFHeader = MQConstants.MQIA_MAX_Q_DEPTH; //the only statistics we want to know about queue is it's max depth.
pcfCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_Q); //if object type is queue, exporter would inquire it.
pcfCmd.addParameter(MQConstants.MQCA_Q_NAME, name); //PCF command would try to retrieve statistics about queue with specific name
pcfCmd.addParameter(MQConstants.MQIA_Q_TYPE, MQConstants.MQQT_LOCAL); // and specific type
pcfHeader = MQConstants.MQIA_MAX_Q_DEPTH; //the only statistics we want to know about queue is it's max depth.
break;
case LISTENER:
PCFCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_LISTENER_STATUS); //if object type is listener, exporter would inquire it.
PCFCmd.addParameter(MQConstants.MQCACH_LISTENER_NAME, name);//PCF command would try to retrieve statistics about listener with specific name
PCFHeader = MQConstants.MQIACH_LISTENER_STATUS;//the only statistics we want to know about listener is it's status.
pcfCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_LISTENER_STATUS); //if object type is listener, exporter would inquire it.
pcfCmd.addParameter(MQConstants.MQCACH_LISTENER_NAME, name);//PCF command would try to retrieve statistics about listener with specific name
pcfHeader = MQConstants.MQIACH_LISTENER_STATUS;//the only statistics we want to know about listener is it's status.
break;
case CHANNEL:
PCFCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_CHANNEL_STATUS); //if object type is channel, exporter would inquire it.
PCFCmd.addParameter(MQConstants.MQCACH_CHANNEL_NAME, name); //PCF command would try to retrieve statistics about channel with specific name
PCFHeader = MQConstants.MQIACH_CHANNEL_STATUS;//the only statistics we want to know about channel is it's status.
pcfCmd = new PCFMessage(MQConstants.MQCMD_INQUIRE_CHANNEL_STATUS); //if object type is channel, exporter would inquire it.
pcfCmd.addParameter(MQConstants.MQCACH_CHANNEL_NAME, name); //PCF command would try to retrieve statistics about channel with specific name
pcfHeader = MQConstants.MQIACH_CHANNEL_STATUS;//the only statistics we want to know about channel is it's status.
break;
default: {
default:
logger.error("Unknown type for MQObject: {}", type.name());
throw new RuntimeException("Unable to create new MQObject. Received unexpected MQObject type: " + type.name());
}

}
}

Expand Down Expand Up @@ -90,7 +88,7 @@ public String getName() {
* @return - MQConstant integer code.
*/
public int getPCFHeader() {
return PCFHeader;
return pcfHeader;
}

/**
Expand All @@ -107,8 +105,8 @@ public MQType getType() {
*
* @return - prepared PCF command object.
*/
public PCFMessage getPCFCmd() {
return PCFCmd;
public PCFMessage getPcfCmd() {
return pcfCmd;
}

/**
Expand Down
57 changes: 34 additions & 23 deletions src/main/java/ru/cinimex/exporter/mq/MQPCFSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;

/**
* MQPCFSubscriber is technically not a subscriber, but a runnable object, which sends PCFCommands every n seconds to
Expand All @@ -24,7 +25,7 @@ public class MQPCFSubscriber implements Runnable {
private String queueManagerName;
private MQObject object;
private PCFMessageAgent agent;
private ArrayList<MQObject> objects;
private List<MQObject> objects;

/**
* MQPCFSubscriber constructor which is used, when exporter is configured to use 1 MQPCFSubscriber per 1 MQObject.
Expand All @@ -43,9 +44,9 @@ public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connec
*
* @param queueManagerName - queue manager name.
* @param connectionProperties - connection properties.
* @param objects - Array with all MQObjects.
* @param objects - List with all MQObjects.
*/
public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, ArrayList<MQObject> objects) {
public MQPCFSubscriber(String queueManagerName, Hashtable<String, Object> connectionProperties, List<MQObject> objects) {
establishMQConnection(queueManagerName, connectionProperties);
this.objects = objects;
this.object = new MQObject("*", objects.get(0).getType());
Expand Down Expand Up @@ -90,39 +91,45 @@ private void updateMetricWithoutWildcards(PCFMessage response, String objectName
private void updateMetricsWithWildcards(PCFMessage[] pcfResponse) {
ArrayList<String> objectNames = new ArrayList<>();
//copy all objects names to temporary array
for (MQObject object : objects) {
objectNames.add(object.getName());
for (MQObject monitoredObject : objects) {
objectNames.add(monitoredObject.getName());
}
for (PCFMessage response : pcfResponse) {
String objectName = (String) response.getParameterValue(MQObject.objectNameCode(object.getType()));
objectName = objectName.trim();
//if temporary array contains metric, then remove it from temporary array and update metric
if (objectNames.contains(objectName)) {
objectNames.remove(objectName);
Object result = response.getParameterValue(object.getPCFHeader());
double prometheusValue = MetricsReference.getMetricValue(object.getType(), (Integer) result);
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), prometheusValue, queueManagerName, objectName);
updateMetricWithoutWildcards(response, objectName);
}
}

//There are some objects in temporary array? It means that "*" wildcard didn't return all values.
//Are there any objects left in temporary array? It means that "*" wildcard didn't return all values.
//There are multiple reasons why it could happen. For example, MQ channel has status "inactive".
//Then we send direct PCF command for specific object. If some error occurs, we have custom processing for it.
if (objectNames.size() > 0) {
for (String objectName : objectNames) {
MQObject directObject = new MQObject(objectName, object.getType());
try {
PCFMessage[] directPCFResponse = agent.send(directObject.getPCFCmd());
updateMetricWithoutWildcards(directPCFResponse[0], objectName);
} catch (PCFException e) {
//This error means, that channel has status "inactive".
updateWithDirectPCFCommand(objectNames);
}

/**
* Retrieves info about all objects from input array via direct pcf commands.
*
* @param objectNames - input array with objects.
*/
private void updateWithDirectPCFCommand(ArrayList<String> objectNames) {
for (String objectName : objectNames) {
MQObject directObject = new MQObject(objectName, object.getType());
try {
PCFMessage[] directPCFResponse = agent.send(directObject.getPcfCmd());
updateMetricWithoutWildcards(directPCFResponse[0], objectName);
} catch (PCFException e) {
//This error means, that channel has status "inactive".
if (e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
logger.warn("Channel {} is possibly inactive.", objectName);
if (e.reasonCode == MQConstants.MQRCCF_CHL_STATUS_NOT_FOUND) {
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, objectName);
}
} catch (IOException | MQException e) {
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, objectName);
} else {
logger.error("Error occurred during sending PCF command: ", e);
}
} catch (IOException | MQException e) {
logger.error("Error occurred during sending PCF command: ", e);
}
}
}
Expand All @@ -131,7 +138,7 @@ private void updateMetricsWithWildcards(PCFMessage[] pcfResponse) {
public void run() {
try {
logger.debug("Sending PCF command for object type {} with name {}...", object.getType(), object.getName());
PCFMessage[] pcfResponse = agent.send(object.getPCFCmd());
PCFMessage[] pcfResponse = agent.send(object.getPcfCmd());
if (!objects.isEmpty()) {
updateMetricsWithWildcards(pcfResponse);
} else {
Expand All @@ -145,6 +152,10 @@ public void run() {
logger.warn("Channel {} is possibly inactive.", object.getName());
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQCHS_INACTIVE), queueManagerName, object.getName());
}
if (object.getType() == MQObject.MQType.LISTENER && e.reasonCode == MQConstants.MQRC_UNKNOWN_OBJECT_NAME) {
MetricsManager.updateMetric(MetricsReference.getMetricName(object.getType()), MetricsReference.getMetricValue(object.getType(), MQConstants.MQSVC_STATUS_STOPPED), queueManagerName, object.getName());
logger.warn("Listener {} is possibly stopped.", object.getName());
}
} catch (MQException | IOException e) {
logger.error("Error occurred during sending PCF command: ", e);
}
Expand Down
Loading

0 comments on commit d0ee2d4

Please sign in to comment.