diff --git a/java/src/jmri/jmrix/AbstractMRTrafficController.java b/java/src/jmri/jmrix/AbstractMRTrafficController.java index d84a7fdbe55..50f523451e4 100644 --- a/java/src/jmri/jmrix/AbstractMRTrafficController.java +++ b/java/src/jmri/jmrix/AbstractMRTrafficController.java @@ -128,7 +128,7 @@ protected boolean getSynchronizeRx() { // The methods to implement the abstract Interface - protected Vector cmdListeners = new Vector(); + protected final Vector cmdListeners = new Vector(); protected synchronized void addListener(AbstractMRListener l) { // add only if not already registered @@ -158,6 +158,7 @@ protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) { // make a copy of the listener vector to synchronized not needed for transmit Vector v; synchronized (this) { + // FIXME: unnecessary synchronized; the Vector IS already thread-safe. v = (Vector) cmdListeners.clone(); } // forward to all listeners @@ -271,6 +272,7 @@ protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) { // make a copy of the listener vector to synchronized (not needed for transmit?) Vector v; synchronized (this) { + // FIXME: unnecessary synchronized; the Vector IS already thread-safe. v = (Vector) cmdListeners.clone(); } // forward to all listeners diff --git a/java/src/jmri/jmrix/lenz/XNetTrafficController.java b/java/src/jmri/jmrix/lenz/XNetTrafficController.java index ba594b6fbe6..0076131a445 100644 --- a/java/src/jmri/jmrix/lenz/XNetTrafficController.java +++ b/java/src/jmri/jmrix/lenz/XNetTrafficController.java @@ -20,7 +20,9 @@ */ public abstract class XNetTrafficController extends AbstractMRTrafficController implements XNetInterface { - protected HashMap mListenerMasks; + // @GuardedBy(this) + // PENDING: the field should be probably made private w/ accessor to force proper synchronization for reading. + protected final HashMap mListenerMasks; /** * Create a new XNetTrafficController. @@ -90,50 +92,48 @@ public void forwardReply(AbstractMRListener client, AbstractMRReply m) { if (!((XNetReply) m).checkParity()) { log.warn("Ignore packet with bad checksum: {}", (m)); } else { - try { - int mask = (mListenerMasks.get(client)); - if (mask == XNetInterface.ALL) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.COMMINFO) - == XNetInterface.COMMINFO - && (((XNetReply) m).getElement(0) - == XNetConstants.LI_MESSAGE_RESPONSE_HEADER)) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.CS_INFO) - == XNetInterface.CS_INFO - && (((XNetReply) m).getElement(0) - == XNetConstants.CS_INFO - || ((XNetReply) m).getElement(0) - == XNetConstants.CS_SERVICE_MODE_RESPONSE - || ((XNetReply) m).getElement(0) - == XNetConstants.CS_REQUEST_RESPONSE - || ((XNetReply) m).getElement(0) - == XNetConstants.BC_EMERGENCY_STOP)) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.FEEDBACK) - == XNetInterface.FEEDBACK - && (((XNetReply) m).isFeedbackMessage() - || ((XNetReply) m).isFeedbackBroadcastMessage())) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.THROTTLE) - == XNetInterface.THROTTLE - && ((XNetReply) m).isThrottleMessage()) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.CONSIST) - == XNetInterface.CONSIST - && ((XNetReply) m).isConsistMessage()) { - ((XNetListener) client).message((XNetReply) m); - } else if ((mask & XNetInterface.INTERFACE) - == XNetInterface.INTERFACE - && (((XNetReply) m).getElement(0) - == XNetConstants.LI_VERSION_RESPONSE - || ((XNetReply) m).getElement(0) - == XNetConstants.LI101_REQUEST)) { - ((XNetListener) client).message((XNetReply) m); - } - } catch (NullPointerException e) { - // catch null pointer exceptions, caused by a client - // that sent a message without being a registered listener + int mask; + synchronized (this) { + mask = mListenerMasks.getOrDefault(client, XNetInterface.ALL); + } + if (mask == XNetInterface.ALL) { + // Note: also executing this branch, if the client is not registered at all. + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.COMMINFO) + == XNetInterface.COMMINFO + && (((XNetReply) m).getElement(0) + == XNetConstants.LI_MESSAGE_RESPONSE_HEADER)) { + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.CS_INFO) + == XNetInterface.CS_INFO + && (((XNetReply) m).getElement(0) + == XNetConstants.CS_INFO + || ((XNetReply) m).getElement(0) + == XNetConstants.CS_SERVICE_MODE_RESPONSE + || ((XNetReply) m).getElement(0) + == XNetConstants.CS_REQUEST_RESPONSE + || ((XNetReply) m).getElement(0) + == XNetConstants.BC_EMERGENCY_STOP)) { + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.FEEDBACK) + == XNetInterface.FEEDBACK + && (((XNetReply) m).isFeedbackMessage() + || ((XNetReply) m).isFeedbackBroadcastMessage())) { + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.THROTTLE) + == XNetInterface.THROTTLE + && ((XNetReply) m).isThrottleMessage()) { + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.CONSIST) + == XNetInterface.CONSIST + && ((XNetReply) m).isConsistMessage()) { + ((XNetListener) client).message((XNetReply) m); + } else if ((mask & XNetInterface.INTERFACE) + == XNetInterface.INTERFACE + && (((XNetReply) m).getElement(0) + == XNetConstants.LI_VERSION_RESPONSE + || ((XNetReply) m).getElement(0) + == XNetConstants.LI101_REQUEST)) { ((XNetListener) client).message((XNetReply) m); } } @@ -141,16 +141,16 @@ public void forwardReply(AbstractMRListener client, AbstractMRReply m) { // We use the pollMessage routines for high priority messages. // This means responses to time critical messages (turnout off messages). - LinkedBlockingQueue highPriorityQueue = null; - LinkedBlockingQueue highPriorityListeners = null; - - public void sendHighPriorityXNetMessage(XNetMessage m, XNetListener reply) { - try { - highPriorityQueue.put(m); - highPriorityListeners.put(reply); - } catch (java.lang.InterruptedException ie) { - log.error("Interrupted while adding High Priority Message to Queue"); - } + // PENDING: these fields should be probably made private w/ accessor to force proper synchronization for reading. + final LinkedBlockingQueue highPriorityQueue; + final LinkedBlockingQueue highPriorityListeners; + + public synchronized void sendHighPriorityXNetMessage(XNetMessage m, XNetListener reply) { + // using offer as the queue is unbounded and should never block on write. + // Note: the message should be inserted LAST, as the message is tested/acquired first + // by the reader; serves a a guard for next item processing. + highPriorityListeners.offer(reply); + highPriorityQueue.offer(m); } @Override @@ -283,7 +283,7 @@ protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) { /** * Reference to the command station in communication here. */ - LenzCommandStation mCommandStation; + final LenzCommandStation mCommandStation; /** * Get access to communicating command station object. diff --git a/java/src/jmri/jmrix/srcp/SRCPTrafficController.java b/java/src/jmri/jmrix/srcp/SRCPTrafficController.java index 9bd681cd3d4..1f76629b391 100644 --- a/java/src/jmri/jmrix/srcp/SRCPTrafficController.java +++ b/java/src/jmri/jmrix/srcp/SRCPTrafficController.java @@ -281,6 +281,7 @@ protected void notifyReply(SimpleNode r, AbstractMRListener dest) { // make a copy of the listener vector to synchronized (not needed for transmit?) Vector v; synchronized (this) { + // FIXME: unnecessary synchronized; the Vector IS already thread-safe. v = (Vector) cmdListeners.clone(); } // forward to all listeners diff --git a/java/test/jmri/jmrix/openlcb/swing/monitor/MonitorFrameDemo.java b/java/test/jmri/jmrix/openlcb/swing/monitor/MonitorFrameDemo.java index 90bd155eeed..5663aa733b1 100644 --- a/java/test/jmri/jmrix/openlcb/swing/monitor/MonitorFrameDemo.java +++ b/java/test/jmri/jmrix/openlcb/swing/monitor/MonitorFrameDemo.java @@ -27,12 +27,14 @@ class OurScaffold extends TrafficControllerScaffold { * Forward CanMessage to object under test */ public void testMessage(CanMessage f) { + // FIXME: must clone, iterator is not threadsafe. for (jmri.jmrix.AbstractMRListener c : cmdListeners) { ((CanListener) c).message(f); } } public void testReply(CanReply f) { + // FIXME: must clone, iterator is not threadsafe. for (jmri.jmrix.AbstractMRListener c : cmdListeners) { ((CanListener) c).reply(f); }