Skip to content

Commit

Permalink
Fix #6617: fixing startup NPE and synchronization on mListenerMasks
Browse files Browse the repository at this point in the history
  • Loading branch information
svatoun committed Mar 3, 2019
1 parent 40dc1f5 commit 62b0ce2
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 57 deletions.
4 changes: 3 additions & 1 deletion java/src/jmri/jmrix/AbstractMRTrafficController.java
Expand Up @@ -128,7 +128,7 @@ protected boolean getSynchronizeRx() {

// The methods to implement the abstract Interface

protected Vector<AbstractMRListener> cmdListeners = new Vector<AbstractMRListener>();
protected final Vector<AbstractMRListener> cmdListeners = new Vector<AbstractMRListener>();

protected synchronized void addListener(AbstractMRListener l) {
// add only if not already registered
Expand Down Expand Up @@ -158,6 +158,7 @@ protected void notifyMessage(AbstractMRMessage m, AbstractMRListener notMe) {
// make a copy of the listener vector to synchronized not needed for transmit
Vector<AbstractMRListener> v;
synchronized (this) {
// FIXME: unnecessary synchronized; the Vector IS already thread-safe.
v = (Vector<AbstractMRListener>) cmdListeners.clone();
}
// forward to all listeners
Expand Down Expand Up @@ -271,6 +272,7 @@ protected void notifyReply(AbstractMRReply r, AbstractMRListener dest) {
// make a copy of the listener vector to synchronized (not needed for transmit?)
Vector<AbstractMRListener> v;
synchronized (this) {
// FIXME: unnecessary synchronized; the Vector IS already thread-safe.
v = (Vector<AbstractMRListener>) cmdListeners.clone();
}
// forward to all listeners
Expand Down
112 changes: 56 additions & 56 deletions java/src/jmri/jmrix/lenz/XNetTrafficController.java
Expand Up @@ -20,7 +20,9 @@
*/
public abstract class XNetTrafficController extends AbstractMRTrafficController implements XNetInterface {

protected HashMap<XNetListener, Integer> mListenerMasks;
// @GuardedBy(this)
// PENDING: the field should be probably made private w/ accessor to force proper synchronization for reading.
protected final HashMap<XNetListener, Integer> mListenerMasks;

/**
* Create a new XNetTrafficController.
Expand Down Expand Up @@ -90,67 +92,65 @@ public void forwardReply(AbstractMRListener client, AbstractMRReply m) {
if (!((XNetReply) m).checkParity()) {
log.warn("Ignore packet with bad checksum: {}", (m));
} else {
try {
int mask = (mListenerMasks.get(client));
if (mask == XNetInterface.ALL) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.COMMINFO)
== XNetInterface.COMMINFO
&& (((XNetReply) m).getElement(0)
== XNetConstants.LI_MESSAGE_RESPONSE_HEADER)) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.CS_INFO)
== XNetInterface.CS_INFO
&& (((XNetReply) m).getElement(0)
== XNetConstants.CS_INFO
|| ((XNetReply) m).getElement(0)
== XNetConstants.CS_SERVICE_MODE_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.CS_REQUEST_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.BC_EMERGENCY_STOP)) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.FEEDBACK)
== XNetInterface.FEEDBACK
&& (((XNetReply) m).isFeedbackMessage()
|| ((XNetReply) m).isFeedbackBroadcastMessage())) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.THROTTLE)
== XNetInterface.THROTTLE
&& ((XNetReply) m).isThrottleMessage()) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.CONSIST)
== XNetInterface.CONSIST
&& ((XNetReply) m).isConsistMessage()) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.INTERFACE)
== XNetInterface.INTERFACE
&& (((XNetReply) m).getElement(0)
== XNetConstants.LI_VERSION_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.LI101_REQUEST)) {
((XNetListener) client).message((XNetReply) m);
}
} catch (NullPointerException e) {
// catch null pointer exceptions, caused by a client
// that sent a message without being a registered listener
int mask;
synchronized (this) {
mask = mListenerMasks.getOrDefault(client, XNetInterface.ALL);
}
if (mask == XNetInterface.ALL) {
// Note: also executing this branch, if the client is not registered at all.
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.COMMINFO)
== XNetInterface.COMMINFO
&& (((XNetReply) m).getElement(0)
== XNetConstants.LI_MESSAGE_RESPONSE_HEADER)) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.CS_INFO)
== XNetInterface.CS_INFO
&& (((XNetReply) m).getElement(0)
== XNetConstants.CS_INFO
|| ((XNetReply) m).getElement(0)
== XNetConstants.CS_SERVICE_MODE_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.CS_REQUEST_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.BC_EMERGENCY_STOP)) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.FEEDBACK)
== XNetInterface.FEEDBACK
&& (((XNetReply) m).isFeedbackMessage()
|| ((XNetReply) m).isFeedbackBroadcastMessage())) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.THROTTLE)
== XNetInterface.THROTTLE
&& ((XNetReply) m).isThrottleMessage()) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.CONSIST)
== XNetInterface.CONSIST
&& ((XNetReply) m).isConsistMessage()) {
((XNetListener) client).message((XNetReply) m);
} else if ((mask & XNetInterface.INTERFACE)
== XNetInterface.INTERFACE
&& (((XNetReply) m).getElement(0)
== XNetConstants.LI_VERSION_RESPONSE
|| ((XNetReply) m).getElement(0)
== XNetConstants.LI101_REQUEST)) {
((XNetListener) client).message((XNetReply) m);
}
}
}

// We use the pollMessage routines for high priority messages.
// This means responses to time critical messages (turnout off messages).
LinkedBlockingQueue<XNetMessage> highPriorityQueue = null;
LinkedBlockingQueue<XNetListener> highPriorityListeners = null;

public void sendHighPriorityXNetMessage(XNetMessage m, XNetListener reply) {
try {
highPriorityQueue.put(m);
highPriorityListeners.put(reply);
} catch (java.lang.InterruptedException ie) {
log.error("Interrupted while adding High Priority Message to Queue");
}
// PENDING: these fields should be probably made private w/ accessor to force proper synchronization for reading.
final LinkedBlockingQueue<XNetMessage> highPriorityQueue;
final LinkedBlockingQueue<XNetListener> highPriorityListeners;

public synchronized void sendHighPriorityXNetMessage(XNetMessage m, XNetListener reply) {
// using offer as the queue is unbounded and should never block on write.
// Note: the message should be inserted LAST, as the message is tested/acquired first
// by the reader; serves a a guard for next item processing.
highPriorityListeners.offer(reply);
highPriorityQueue.offer(m);
}

@Override
Expand Down Expand Up @@ -283,7 +283,7 @@ protected void handleTimeout(AbstractMRMessage msg, AbstractMRListener l) {
/**
* Reference to the command station in communication here.
*/
LenzCommandStation mCommandStation;
final LenzCommandStation mCommandStation;

/**
* Get access to communicating command station object.
Expand Down
1 change: 1 addition & 0 deletions java/src/jmri/jmrix/srcp/SRCPTrafficController.java
Expand Up @@ -281,6 +281,7 @@ protected void notifyReply(SimpleNode r, AbstractMRListener dest) {
// make a copy of the listener vector to synchronized (not needed for transmit?)
Vector<AbstractMRListener> v;
synchronized (this) {
// FIXME: unnecessary synchronized; the Vector IS already thread-safe.
v = (Vector<AbstractMRListener>) cmdListeners.clone();
}
// forward to all listeners
Expand Down
Expand Up @@ -27,12 +27,14 @@ class OurScaffold extends TrafficControllerScaffold {
* Forward CanMessage to object under test
*/
public void testMessage(CanMessage f) {
// FIXME: must clone, iterator is not threadsafe.
for (jmri.jmrix.AbstractMRListener c : cmdListeners) {
((CanListener) c).message(f);
}
}

public void testReply(CanReply f) {
// FIXME: must clone, iterator is not threadsafe.
for (jmri.jmrix.AbstractMRListener c : cmdListeners) {
((CanListener) c).reply(f);
}
Expand Down

0 comments on commit 62b0ce2

Please sign in to comment.