Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Moved DiagnosticsHandler into its own class (and moved ProbeHandler i…

…nto it)
  • Loading branch information...
commit 8d395b9304b4cb2d3901245e872c22ed5e192eb1 1 parent 1f84290
Bela Ban authored
273 src/org/jgroups/protocols/TP.java
View
@@ -7,6 +7,7 @@
import org.jgroups.conf.PropertyConverters;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
+import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;
import org.jgroups.util.ThreadFactory;
@@ -448,7 +449,7 @@ public String getTimerClass() {
protected Bundler bundler=null;
protected DiagnosticsHandler diag_handler=null;
- protected final List<ProbeHandler> preregistered_probe_handlers=new LinkedList<ProbeHandler>();
+ protected final List<DiagnosticsHandler.ProbeHandler> preregistered_probe_handlers=new LinkedList<DiagnosticsHandler.ProbeHandler>();
/**
* If singleton_name is enabled, this map is used to de-multiplex incoming messages according to their cluster
@@ -519,18 +520,28 @@ public void resetStats() {
num_oob_msgs_received=num_incoming_msgs_received=0;
}
- public void registerProbeHandler(ProbeHandler handler) {
+ public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null)
diag_handler.registerProbeHandler(handler);
else
preregistered_probe_handlers.add(handler);
}
- public void unregisterProbeHandler(ProbeHandler handler) {
+ public void unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null)
diag_handler.unregisterProbeHandler(handler);
}
+ /**
+ * Sets a {@link DiagnosticsHandler}. Should be set before the stack is started
+ * @param handler
+ */
+ public void setDiagnosticsHandler(DiagnosticsHandler handler) {
+ if(diag_handler != null)
+ diag_handler.stop();
+ diag_handler=handler;
+ }
+
public ThreadGroup getPoolThreadGroup() {
return pool_thread_group;
}
@@ -913,9 +924,52 @@ public void start() throws Exception {
throw new Exception("timer is null");
if(enable_diagnostics) {
- diag_handler=new DiagnosticsHandler();
+ if(diag_handler == null)
+ diag_handler=new DiagnosticsHandler(diagnostics_addr, diagnostics_port, log, getSocketFactory(),
+ getThreadFactory());
+
+ diag_handler.registerProbeHandler(new DiagnosticsHandler.ProbeHandler() {
+ public Map<String, String> handleProbe(String... keys) {
+ Map<String,String> retval=new HashMap<String,String>(2);
+ for(String key: keys) {
+ if(key.equals("dump")) {
+ retval.put("dump", Util.dumpThreads());
+ continue;
+ }
+ if(key.equals("uuids")) {
+ retval.put("uuids", printLogicalAddressCache());
+ if(!isSingleton() && !retval.containsKey("local_addr"))
+ retval.put("local_addr", local_addr != null? local_addr.toString() : null);
+ continue;
+ }
+ if(key.equals("keys")) {
+ StringBuilder sb=new StringBuilder();
+ for(DiagnosticsHandler.ProbeHandler handler: diag_handler.getProbeHandlers()) {
+ String[] tmp=handler.supportedKeys();
+ if(tmp != null && tmp.length > 0) {
+ for(String s: tmp)
+ sb.append(s).append(" ");
+ }
+ }
+ retval.put("keys", sb.toString());
+ }
+ if(key.equals("info")) {
+ if(singleton_name != null && singleton_name.length() > 0)
+ retval.put("singleton_name", singleton_name);
+
+ }
+ }
+ return retval;
+ }
+
+ public String[] supportedKeys() {
+ return new String[]{"dump", "keys", "uuids", "info"};
+ }
+ });
+
diag_handler.start();
- for(ProbeHandler handler: preregistered_probe_handlers)
+
+ for(DiagnosticsHandler.ProbeHandler handler: preregistered_probe_handlers)
diag_handler.registerProbeHandler(handler);
preregistered_probe_handlers.clear();
}
@@ -949,7 +1003,6 @@ public void stop() {
}
-
protected void handleConnect() throws Exception {
connect_count++;
}
@@ -2000,217 +2053,11 @@ private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs)
-
- public interface ProbeHandler {
- /**
- * Handles a probe. For each key that is handled, the key and its result should be in the returned map.
- * @param keys
- * @return Map<String,String>. A map of keys and values. A null return value is permissible.
- */
- Map<String,String> handleProbe(String... keys);
-
- /** Returns a list of supported keys */
- String[] supportedKeys();
- }
-
-// /**
-// * Maps UUIDs to physical addresses
-// */
-// public interface AddressMapper {
-// /**
-// * Given a UUID, pick one physical address from a list. If the UUID is null, the message needs to be sent to
-// * the entire cluster. In UDP, for example, we would pick a multicast address
-// * @param uuid The UUID. Null for a cluster wide destination
-// * @param physical_addrs A list of physical addresses
-// * @return an address from the list
-// */
-// Address pick(UUID uuid, List<Address> physical_addrs);
-// }
-
-
- protected class DiagnosticsHandler implements Runnable {
- public static final String THREAD_NAME = "DiagnosticsHandler";
- private Thread thread=null;
- private MulticastSocket diag_sock=null;
- private final Set<ProbeHandler> handlers=new CopyOnWriteArraySet<ProbeHandler>();
-
- DiagnosticsHandler() {
- }
-
- Thread getThread(){
- return thread;
- }
-
- void registerProbeHandler(ProbeHandler handler) {
- if(handler != null)
- handlers.add(handler);
- }
-
- void unregisterProbeHandler(ProbeHandler handler) {
- if(handler != null)
- handlers.remove(handler);
- }
-
- void start() throws IOException {
-
- registerProbeHandler(new ProbeHandler() {
-
- public Map<String, String> handleProbe(String... keys) {
- Map<String,String> retval=new HashMap<String,String>(2);
- for(String key: keys) {
- if(key.equals("dump")) {
- retval.put("dump", Util.dumpThreads());
- continue;
- }
- if(key.equals("uuids")) {
- retval.put("uuids", printLogicalAddressCache());
- if(!isSingleton() && !retval.containsKey("local_addr"))
- retval.put("local_addr", local_addr != null? local_addr.toString() : null);
- continue;
- }
- if(key.equals("keys")) {
- StringBuilder sb=new StringBuilder();
- for(ProbeHandler handler: handlers) {
- String[] tmp=handler.supportedKeys();
- if(tmp != null && tmp.length > 0) {
- for(String s: tmp)
- sb.append(s).append(" ");
- }
- }
- retval.put("keys", sb.toString());
- }
- if(key.equals("info")) {
- if(singleton_name != null && singleton_name.length() > 0)
- retval.put("singleton_name", singleton_name);
-
- }
- }
- return retval;
- }
-
- public String[] supportedKeys() {
- return new String[]{"dump", "keys", "uuids", "info"};
- }
- });
-
- // https://jira.jboss.org/jira/browse/JGRP-777 - this doesn't work on MacOS, and we don't have
- // cross talking on Windows anyway, so we just do it for Linux. (How about Solaris ?)
- // if(can_bind_to_mcast_addr)
- // diag_sock=Util.createMulticastSocket(getSocketFactory(),
- // Global.TP_DIAG_MCAST_SOCK, diagnostics_addr, diagnostics_port, log);
- //else
- diag_sock=getSocketFactory().createMulticastSocket(Global.TP_DIAG_MCAST_SOCK, diagnostics_port);
-
- List<NetworkInterface> interfaces=Util.getAllAvailableInterfaces();
- bindToInterfaces(interfaces, diag_sock);
-
- if(thread == null || !thread.isAlive()) {
- thread=global_thread_factory.newThread(this, THREAD_NAME);
- thread.setDaemon(true);
- thread.start();
- }
- }
-
- void stop() {
- if(diag_sock != null)
- getSocketFactory().close(diag_sock);
- handlers.clear();
- if(thread != null){
- try{
- thread.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
- }
- catch(InterruptedException e){
- Thread.currentThread().interrupt(); // set interrupt flag
- }
- }
- }
-
- public void run() {
- byte[] buf=new byte[1500]; // requests are small (responses might be bigger)
- DatagramPacket packet;
- while(!diag_sock.isClosed() && Thread.currentThread().equals(thread)) {
- packet=new DatagramPacket(buf, 0, buf.length);
- try {
- diag_sock.receive(packet);
- handleDiagnosticProbe(packet.getSocketAddress(), diag_sock,
- new String(packet.getData(), packet.getOffset(), packet.getLength()));
- }
- catch(IOException socket_ex) {
- }
- catch(Throwable e) {
- if(log.isErrorEnabled())
- log.error("failure handling diagnostics request", e);
- }
- }
- }
-
- private void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) {
- StringTokenizer tok=new StringTokenizer(request);
- List<String> list=new ArrayList<String>(10);
-
- while(tok.hasMoreTokens()) {
- String req=tok.nextToken().trim();
- if(req.length() > 0)
- list.add(req);
- }
-
- String[] tokens=new String[list.size()];
- for(int i=0; i < list.size(); i++)
- tokens[i]=list.get(i);
-
-
- for(ProbeHandler handler: handlers) {
- Map<String, String> map=handler.handleProbe(tokens);
- if(map == null || map.isEmpty())
- continue;
- if(!map.containsKey("cluster"))
- map.put("cluster", channel_name != null? channel_name : "n/a");
- StringBuilder info=new StringBuilder();
- for(Map.Entry<String,String> entry: map.entrySet()) {
- info.append(entry.getKey()).append("=").append(entry.getValue()).append("\r\n");
- }
-
- byte[] diag_rsp=info.toString().getBytes();
- if(log.isDebugEnabled())
- log.debug("sending diag response to " + sender);
- try {
- sendResponse(sock, sender, diag_rsp);
- }
- catch(Throwable t) {
- if(log.isErrorEnabled())
- log.error("failed sending diag rsp to " + sender, t);
- }
- }
- }
-
- private void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException {
- DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender);
- sock.send(p);
- }
-
- private void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s) {
- SocketAddress group_addr=new InetSocketAddress(diagnostics_addr, diagnostics_port);
- for(Iterator<NetworkInterface> it=interfaces.iterator(); it.hasNext();) {
- NetworkInterface i=it.next();
- try {
- if (i.getInetAddresses().hasMoreElements()) { // fix for VM crash - suggested by JJalenak@netopia.com
- s.joinGroup(group_addr, i);
- if(log.isTraceEnabled())
- log.trace("joined " + group_addr + " on " + i.getName());
- }
- }
- catch(IOException e) {
- log.warn("failed to join " + group_addr + " on " + i.getName() + ": " + e);
- }
- }
- }
- }
-
/**
* Used when the transport is shared (singleton_name is not null). Maintains the cluster name, local address and
* view
*/
- public static class ProtocolAdapter extends Protocol implements ProbeHandler {
+ public static class ProtocolAdapter extends Protocol implements DiagnosticsHandler.ProbeHandler {
String cluster_name;
final short transport_id;
TpHeader header;
177 src/org/jgroups/stack/DiagnosticsHandler.java
View
@@ -0,0 +1,177 @@
+package org.jgroups.stack;
+
+import org.jgroups.Global;
+import org.jgroups.logging.Log;
+import org.jgroups.util.SocketFactory;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.Util;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @author Bela Ban
+ * @since 3.0
+ */
+
+public class DiagnosticsHandler implements Runnable {
+ public static final String THREAD_NAME = "DiagnosticsHandler";
+ protected Thread thread=null;
+ protected MulticastSocket diag_sock=null;
+ protected InetAddress diagnostics_addr=null;
+ protected int diagnostics_port=7500;
+ protected final Set<ProbeHandler> handlers=new CopyOnWriteArraySet<ProbeHandler>();
+ protected final Log log;
+ protected final SocketFactory socket_factory;
+ protected final ThreadFactory thread_factory;
+
+ public DiagnosticsHandler(InetAddress diagnostics_addr, int diagnostics_port,
+ Log log, SocketFactory socket_factory, ThreadFactory thread_factory) {
+ this.diagnostics_addr=diagnostics_addr;
+ this.diagnostics_port=diagnostics_port;
+ this.log=log;
+ this.socket_factory=socket_factory;
+ this.thread_factory=thread_factory;
+ }
+
+ public Thread getThread(){
+ return thread;
+ }
+
+ public Set<ProbeHandler> getProbeHandlers() {return handlers;}
+
+ public void registerProbeHandler(ProbeHandler handler) {
+ if(handler != null)
+ handlers.add(handler);
+ }
+
+ public void unregisterProbeHandler(ProbeHandler handler) {
+ if(handler != null)
+ handlers.remove(handler);
+ }
+
+ public void start() throws IOException {
+ // https://jira.jboss.org/jira/browse/JGRP-777 - this doesn't work on MacOS, and we don't have
+ // cross talking on Windows anyway, so we just do it for Linux. (How about Solaris ?)
+ // if(can_bind_to_mcast_addr)
+ // diag_sock=Util.createMulticastSocket(getSocketFactory(),
+ // Global.TP_DIAG_MCAST_SOCK, diagnostics_addr, diagnostics_port, log);
+ //else
+ diag_sock=socket_factory.createMulticastSocket(Global.TP_DIAG_MCAST_SOCK,diagnostics_port);
+
+ List<NetworkInterface> interfaces=Util.getAllAvailableInterfaces();
+ bindToInterfaces(interfaces, diag_sock);
+
+ if(thread == null || !thread.isAlive()) {
+ thread=thread_factory.newThread(this, THREAD_NAME);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+
+ public void stop() {
+ if(diag_sock != null)
+ socket_factory.close(diag_sock);
+ handlers.clear();
+ if(thread != null){
+ try{
+ thread.join(Global.THREAD_SHUTDOWN_WAIT_TIME);
+ }
+ catch(InterruptedException e){
+ Thread.currentThread().interrupt(); // set interrupt flag
+ }
+ }
+ }
+
+ public void run() {
+ byte[] buf=new byte[1500]; // requests are small (responses might be bigger)
+ DatagramPacket packet;
+ while(!diag_sock.isClosed() && Thread.currentThread().equals(thread)) {
+ packet=new DatagramPacket(buf, 0, buf.length);
+ try {
+ diag_sock.receive(packet);
+ handleDiagnosticProbe(packet.getSocketAddress(), diag_sock,
+ new String(packet.getData(), packet.getOffset(), packet.getLength()));
+ }
+ catch(IOException socket_ex) {
+ }
+ catch(Throwable e) {
+ if(log.isErrorEnabled())
+ log.error("failure handling diagnostics request", e);
+ }
+ }
+ }
+
+ protected void handleDiagnosticProbe(SocketAddress sender, DatagramSocket sock, String request) {
+ StringTokenizer tok=new StringTokenizer(request);
+ List<String> list=new ArrayList<String>(10);
+
+ while(tok.hasMoreTokens()) {
+ String req=tok.nextToken().trim();
+ if(req.length() > 0)
+ list.add(req);
+ }
+
+ String[] tokens=new String[list.size()];
+ for(int i=0; i < list.size(); i++)
+ tokens[i]=list.get(i);
+
+
+ for(ProbeHandler handler: handlers) {
+ Map<String, String> map=handler.handleProbe(tokens);
+ if(map == null || map.isEmpty())
+ continue;
+ StringBuilder info=new StringBuilder();
+ for(Map.Entry<String,String> entry: map.entrySet()) {
+ info.append(entry.getKey()).append("=").append(entry.getValue()).append("\r\n");
+ }
+
+ byte[] diag_rsp=info.toString().getBytes();
+ if(log.isDebugEnabled())
+ log.debug("sending diag response to " + sender);
+ try {
+ sendResponse(sock, sender, diag_rsp);
+ }
+ catch(Throwable t) {
+ if(log.isErrorEnabled())
+ log.error("failed sending diag rsp to " + sender, t);
+ }
+ }
+ }
+
+ protected static void sendResponse(DatagramSocket sock, SocketAddress sender, byte[] buf) throws IOException {
+ DatagramPacket p=new DatagramPacket(buf, 0, buf.length, sender);
+ sock.send(p);
+ }
+
+ protected void bindToInterfaces(List<NetworkInterface> interfaces, MulticastSocket s) {
+ SocketAddress group_addr=new InetSocketAddress(diagnostics_addr, diagnostics_port);
+ for(Iterator<NetworkInterface> it=interfaces.iterator(); it.hasNext();) {
+ NetworkInterface i=it.next();
+ try {
+ if (i.getInetAddresses().hasMoreElements()) { // fix for VM crash - suggested by JJalenak@netopia.com
+ s.joinGroup(group_addr, i);
+ if(log.isTraceEnabled())
+ log.trace("joined " + group_addr + " on " + i.getName());
+ }
+ }
+ catch(IOException e) {
+ log.warn("failed to join " + group_addr + " on " + i.getName() + ": " + e);
+ }
+ }
+ }
+
+ public interface ProbeHandler {
+ /**
+ * Handles a probe. For each key that is handled, the key and its result should be in the returned map.
+ * @param keys
+ * @return Map<String,String>. A map of keys and values. A null return value is permissible.
+ */
+ Map<String,String> handleProbe(String... keys);
+
+ /** Returns a list of supported keys */
+ String[] supportedKeys();
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.