Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/features/atlas' into jira/HZN-701
Browse files Browse the repository at this point in the history
  • Loading branch information
Markus von Rüden committed May 11, 2016
2 parents 04d058f + f8e5108 commit b284885
Show file tree
Hide file tree
Showing 63 changed files with 1,660 additions and 1,370 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;


public interface SnmpStrategy {
Expand All @@ -43,6 +44,7 @@ public interface SnmpStrategy {

SnmpValue get(SnmpAgentConfig agentConfig, SnmpObjId oid);
SnmpValue[] get(SnmpAgentConfig agentConfig, SnmpObjId[] oids);
CompletableFuture<SnmpValue[]> getAsync(SnmpAgentConfig agentConfig, SnmpObjId[] oids);

SnmpValue getNext(SnmpAgentConfig agentConfig, SnmpObjId oid);
SnmpValue[] getNext(SnmpAgentConfig agentConfig, SnmpObjId[] oids);
Expand Down
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,6 +87,10 @@ public static SnmpValue[] get(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {
return getStrategy().get(agentConfig, oids);
}

public static CompletableFuture<SnmpValue[]> getAsync(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {
return getStrategy().getAsync(agentConfig, oids);
}

public static SnmpValue getNext(SnmpAgentConfig agentConfig, SnmpObjId oid) {
return getStrategy().getNext(agentConfig, oid);
}
Expand Down
@@ -0,0 +1,45 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2016 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2016 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.netmgt.snmp;

/**
* Callback used for asynchronous SNMP WALKs.
*
* @author jwhite
*/
public interface SnmpWalkCallback {

/**
* Called when the walk, issued by the given tracker is complete.
*
* @param tracker
* @param t null if the request was completed successfully
*/
public void complete(SnmpWalker tracker, Throwable t);
}
Expand Up @@ -61,7 +61,9 @@ protected WalkerPduBuilder(int maxVarsPerPdu) {
private boolean m_error = false;
private String m_errorMessage = "";
private Throwable m_errorThrowable = null;


private SnmpWalkCallback m_callback;

protected SnmpWalker(InetAddress address, String name, int maxVarsPerPdu, int maxRepetitions, CollectionTracker tracker) {
m_address = address;
m_signal = new CountDownLatch(1);
Expand All @@ -74,6 +76,16 @@ protected SnmpWalker(InetAddress address, String name, int maxVarsPerPdu, int ma
m_maxVarsPerPdu = maxVarsPerPdu;
}

/**
* Sets an (optional) callback that will be triggered when the walk was successfully completed,
* or failed due to some error.
*
* @param callback the callback
*/
public void setCallback(SnmpWalkCallback callback) {
m_callback = callback;
}

protected abstract WalkerPduBuilder createPduBuilder(int maxVarsPerPdu);

public void start() {
Expand Down Expand Up @@ -162,6 +174,18 @@ private void finish() {
} catch (IOException e) {
LOG.error("{}: Unexpected Error occured closing SNMP session for: {}", getName(), m_address, e);
}
// Trigger the callback after the latch was decreased and the session was closed.
if (m_callback != null) {
Throwable t = null;
if (failed()) {
t = getErrorThrowable();
if (t == null) {
// Not all of the failures provide an exception, so we generate one if necessary
t = new Exception(getErrorMessage());
}
}
m_callback.complete(this, t);
}
}

@Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.opennms.netmgt.snmp.CollectionTracker;
import org.opennms.netmgt.snmp.InetAddrUtils;
Expand Down Expand Up @@ -155,7 +156,13 @@ public SnmpValue[] get(SnmpAgentConfig snmpAgentConfig, SnmpObjId[] oids) {
}
return values;
}


@Override
public CompletableFuture<SnmpValue[]> getAsync(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {
LOG.warn("The JoeSnmpStrategy does not support asynchronous SNMP GET requests.");
return CompletableFuture.completedFuture(get(agentConfig, oids));
}

@Override
public SnmpValue getNext(SnmpAgentConfig snmpAgentConfig, SnmpObjId oid) {
SnmpObjId[] oids = { oid };
Expand Down Expand Up @@ -429,5 +436,4 @@ public SnmpV3TrapBuilder getV3InformBuilder() {
public byte[] getLocalEngineID() {
throw new UnsupportedOperationException();
}

}
Expand Up @@ -36,6 +36,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.opennms.netmgt.snmp.CollectionTracker;
import org.opennms.netmgt.snmp.InetAddrUtils;
Expand Down Expand Up @@ -128,6 +129,11 @@ public SnmpValue[] get(final SnmpAgentConfig agentConfig, final SnmpObjId[] oids
return values.toArray(EMPTY_SNMP_VALUE_ARRAY);
}

@Override
public CompletableFuture<SnmpValue[]> getAsync(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {
return CompletableFuture.completedFuture(get(agentConfig, oids));
}

@Override
public SnmpValue getNext(final SnmpAgentConfig agentConfig, final SnmpObjId oid) {
final PropertyOidContainer oidContainer = getOidContainer(agentConfig);
Expand Down
Expand Up @@ -36,6 +36,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.opennms.netmgt.snmp.CollectionTracker;
import org.opennms.netmgt.snmp.SnmpAgentConfig;
Expand Down Expand Up @@ -63,6 +65,7 @@
import org.snmp4j.Snmp;
import org.snmp4j.TransportMapping;
import org.snmp4j.event.ResponseEvent;
import org.snmp4j.event.ResponseListener;
import org.snmp4j.mp.MPv3;
import org.snmp4j.mp.MessageProcessingModel;
import org.snmp4j.mp.PduHandle;
Expand Down Expand Up @@ -194,7 +197,19 @@ public SnmpValue[] get(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {

return buildAndSendPdu(agentConfig, PDU.GET, oids, null);
}


@Override
public CompletableFuture<SnmpValue[]> getAsync(SnmpAgentConfig agentConfig, SnmpObjId[] oids) {
final CompletableFuture<SnmpValue[]> future = new CompletableFuture<>();
final Snmp4JAgentConfig snmp4jAgentConfig = new Snmp4JAgentConfig(agentConfig);
final PDU pdu = buildPdu(snmp4jAgentConfig, PDU.GET, oids, null);
if (pdu == null) {
future.completeExceptionally(new Exception("Invalid PDU for OIDs: " + Arrays.toString(oids)));
}
send(snmp4jAgentConfig, pdu, true, future);
return future;
}

/**
* SNMP4J getNext implementation
*
Expand Down Expand Up @@ -242,45 +257,65 @@ private SnmpValue[] buildAndSendPdu(SnmpAgentConfig agentConfig, int type, SnmpO
* adapted from default SnmpAgentConfig values to those compatible with the SNMP4J library.
*/
protected SnmpValue[] send(Snmp4JAgentConfig agentConfig, PDU pdu, boolean expectResponse) {
final CompletableFuture<SnmpValue[]> future = new CompletableFuture<>();
send(agentConfig, pdu, expectResponse, future);
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
LOG.error(e.getMessage(), e);
return new SnmpValue[] { null };
}
}

private void send(Snmp4JAgentConfig agentConfig, PDU pdu, boolean expectResponse, CompletableFuture<SnmpValue[]> future) {
Snmp session;

try {
session = agentConfig.createSnmpSession();
} catch (IOException e) {
LOG.error("send: Could not create SNMP session for agent {}", agentConfig, e);
return new SnmpValue[] { null };
future.completeExceptionally(new Exception("Could not create SNMP session for agent"));
return;
}

try {
if (expectResponse) {
try {
session.listen();
} catch (IOException e) {
LOG.error("send: error setting up listener for SNMP responses", e);
return new SnmpValue[] { null };
}
}

if (expectResponse) {
try {
final ResponseEvent responseEvent = session.send(pdu, agentConfig.getTarget());
session.listen();
} catch (IOException e) {
LOG.error("send: error setting up listener for SNMP responses", e);
future.completeExceptionally(new Exception("error setting up listener for SNMP responses"));
return;
}
}

if (expectResponse) {
return processResponse(agentConfig, responseEvent);
} else {
return null;
}
} catch (final IOException e) {
LOG.error("send: error during SNMP operation", e);
return new SnmpValue[] { null };
} catch (final RuntimeException e) {
LOG.error("send: unexpected error during SNMP operation", e);
return new SnmpValue[] { null };
try {
if (expectResponse) {
session.send(pdu, agentConfig.getTarget(), null, new ResponseListener() {
@Override
public void onResponse(ResponseEvent responseEvent) {
if (expectResponse) {
try {
future.complete(processResponse(agentConfig, responseEvent));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
closeQuietly(session);
}
});
} else {
session.send(pdu, agentConfig.getTarget());
future.complete(null);
closeQuietly(session);
}
} finally {
closeQuietly(session);
} catch (final IOException e) {
LOG.error("send: error during SNMP operation", e);
future.completeExceptionally(e);
} catch (final RuntimeException e) {
LOG.error("send: unexpected error during SNMP operation", e);
future.completeExceptionally(e);
}
}


protected PDU buildPdu(Snmp4JAgentConfig agentConfig, int pduType, SnmpObjId[] oids, SnmpValue[] values) {
PDU pdu = agentConfig.createPdu(pduType);
Expand Down Expand Up @@ -611,5 +646,4 @@ private void closeQuietly(Snmp session) {
public byte[] getLocalEngineID() {
return MPv3.createLocalEngineID();
}

}
5 changes: 5 additions & 0 deletions core/snmp/integration-tests/pom.xml
Expand Up @@ -88,5 +88,10 @@
<artifactId>opennms-dao</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -63,11 +63,6 @@ public void rowCompleted(SnmpRowResult result) {
m_receivedRows.add(result);
System.err.println("Received Row: "+result);
}

public void reset() {
m_rowCount = 0;
}

}

public SnmpValue value(String val) {
Expand Down
Expand Up @@ -28,6 +28,9 @@

package org.opennms.netmgt.snmp;

import static com.jayway.awaitility.Awaitility.await;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -247,10 +250,9 @@ public void testSendV1Trap() throws Exception {
trap.setSpecific(1);
trap.setTimeStamp(8640000);
trap.send(getAgentAddress().getHostAddress(), 9162, "public");
Thread.sleep(1000);
assertEquals("Unexpected number of traps Received", 1, m_trapListener.getReceivedTrapCount());
await().atMost(5, SECONDS).until(() -> m_trapListener.getReceivedTrapCount(), equalTo(1));
}

@Test
public void testSendV2Trap() throws Exception {
assumeTrue(m_trapsSupported);
Expand All @@ -266,10 +268,9 @@ public void testSendV2Trap() throws Exception {
pdu.addVarBind(SnmpObjId.get(".1.3.6.1.6.3.1.1.4.3.0"), SnmpUtils.getValueFactory().getObjectId(enterpriseId));

pdu.send(getAgentAddress().getHostAddress(), 9162, "public");
Thread.sleep(1000);
assertEquals("Unexpected number of traps Received", 1, m_trapListener.getReceivedTrapCount());
await().atMost(5, SECONDS).until(() -> m_trapListener.getReceivedTrapCount(), equalTo(1));
}

@Override
public TrapProcessor createTrapProcessor() {
return new TestTrapProcessor();
Expand Down

0 comments on commit b284885

Please sign in to comment.