Skip to content

Commit

Permalink
NMS-7974: Upgrade RequestTracker to v0.7 in order to fix a race condi…
Browse files Browse the repository at this point in the history
…tion with reply processing.
  • Loading branch information
Jesse White committed Nov 16, 2015
1 parent 5efc952 commit eac9330
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dependencies/tracker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<dependency>
<groupId>org.opennms.lib</groupId>
<artifactId>org.opennms.lib.tracker</artifactId>
<version>0.6</version>
<version>0.7</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
package org.opennms.netmgt.icmp.jna;

import java.net.InetAddress;
import java.util.Queue;

import org.opennms.netmgt.icmp.EchoPacket;
import org.opennms.netmgt.icmp.IcmpMessengerIOException;
import org.opennms.protocols.rt.Messenger;
import org.opennms.protocols.rt.ReplyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,7 +49,7 @@ public class JnaIcmpMessenger implements Messenger<JnaPingRequest, JnaPingReply>

private V4Pinger m_v4;
private V6Pinger m_v6;
private Queue<JnaPingReply> pendingReplies = null;
private ReplyHandler<JnaPingReply> m_callback = null;

public JnaIcmpMessenger(final int pingerId) throws Exception {
Throwable error = null;
Expand Down Expand Up @@ -98,15 +98,15 @@ public void sendRequest(final JnaPingRequest request) {
}

@Override
public void start(final Queue<JnaPingReply> replyQueue) {
pendingReplies = replyQueue;
public void start(ReplyHandler<JnaPingReply> callback) {
m_callback = callback;
m_v4.start();
m_v6.start();
}

@Override
public void onPingReply(final InetAddress address, final EchoPacket packet) {
pendingReplies.offer(new JnaPingReply(address, packet));
m_callback.handleReply(new JnaPingReply(address, packet));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@

import java.io.IOException;
import java.net.DatagramPacket;
import java.util.Queue;

import org.opennms.core.logging.Logging;
import org.opennms.protocols.icmp.ICMPEchoPacket;
import org.opennms.protocols.icmp.ICMPHeader;
import org.opennms.protocols.icmp.IcmpSocket;
import org.opennms.protocols.rt.Messenger;
import org.opennms.protocols.rt.ReplyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,7 +62,7 @@ public JniIcmpMessenger(int pingerId) throws IOException {
m_socket = new IcmpSocket();
}

void processPackets(Queue<JniPingResponse> pendingReplies) {
void processPackets(ReplyHandler<JniPingResponse> callback) {
final int pingerId = m_pingerId;
while (true) {
try {
Expand All @@ -74,7 +73,7 @@ void processPackets(Queue<JniPingResponse> pendingReplies) {
if (reply.isEchoReply() && reply.getIdentifier() == pingerId) {
// Remove this so we don't send a lot of time in this method when we should be processing packets
// LogUtils.debugf(this, "Found an echo packet addr = %s, port = %d, length = %d, created reply %s", packet.getAddress(), packet.getPort(), packet.getLength(), reply);
pendingReplies.offer(reply);
callback.handleReply(reply);
}
} catch (IOException e) {
LOG.error("I/O Error occurred reading from ICMP Socket", e);
Expand All @@ -101,13 +100,13 @@ public void sendRequest(JniPingRequest request) {

/** {@inheritDoc} */
@Override
public void start(final Queue<JniPingResponse> responseQueue) {
public void start(final ReplyHandler<JniPingResponse> callback) {
final Thread socketReader = new Thread("JNI-ICMP-"+m_pingerId+"-Socket-Reader") {
@Override
public void run() {
Logging.putPrefix("icmp");
try {
processPackets(responseQueue);
processPackets(callback);
} catch (Throwable t) {
LOG.error("Unexpected exception on Thread {}!", this, t);
}
Expand Down Expand Up @@ -149,4 +148,5 @@ public static JniPingResponse createPingResponse(DatagramPacket packet) {
//
return new JniPingResponse(packet.getAddress(), pkt);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.Inet6Address;
import java.util.Queue;

import org.opennms.core.logging.Logging;
import org.opennms.protocols.icmp6.ICMPv6EchoReply;
import org.opennms.protocols.icmp6.ICMPv6Packet;
import org.opennms.protocols.icmp6.ICMPv6Packet.Type;
import org.opennms.protocols.icmp6.ICMPv6Socket;
import org.opennms.protocols.rt.Messenger;
import org.opennms.protocols.rt.ReplyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,15 +65,15 @@ public Jni6IcmpMessenger(int pingerId) throws IOException {
m_socket = new ICMPv6Socket();
}

void processPackets(Queue<Jni6PingResponse> pendingReplies) {
void processPackets(ReplyHandler<Jni6PingResponse> callback) {
while (true) {
try {
DatagramPacket packet = m_socket.receive();

Jni6PingResponse reply = Jni6IcmpMessenger.createPingResponse(packet);

if (reply != null && reply.getIdentifier() == m_pingerId) {
pendingReplies.offer(reply);
callback.handleReply(reply);
}


Expand Down Expand Up @@ -103,14 +103,14 @@ public void sendRequest(Jni6PingRequest request) {

/** {@inheritDoc} */
@Override
public void start(final Queue<Jni6PingResponse> responseQueue) {
public void start(final ReplyHandler<Jni6PingResponse> callback) {
Thread socketReader = new Thread("JNI-ICMP-"+m_pingerId+"-Socket-Reader") {

@Override
public void run() {
Logging.putPrefix("icmp");
try {
processPackets(responseQueue);
processPackets(callback);
} catch (Throwable t) {
LOG.error("Unexpected exception on Thread {}!", this, t);
}
Expand Down

0 comments on commit eac9330

Please sign in to comment.