Skip to content

Commit

Permalink
Merge pull request #1159 from OpenNMS/jira/HZN-955
Browse files Browse the repository at this point in the history
Merging to develop.
  • Loading branch information
soleger committed Nov 28, 2016
2 parents fb4ba84 + c680f6c commit f6c8ad8
Show file tree
Hide file tree
Showing 22 changed files with 351 additions and 193 deletions.
@@ -0,0 +1,44 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2016 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2016 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.core.rpc.api;

/**
* Thrown when no response was received from a remote system
* before the request's time to live expired.
*
* @author jesse
*/
public class RequestTimedOutException extends Exception {

private static final long serialVersionUID = -6129623493993788846L;

public RequestTimedOutException(Throwable cause) {
super(cause);
}
}
Expand Up @@ -33,8 +33,10 @@
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spi.Synchronization;
import org.opennms.core.rpc.api.RequestTimedOutException;
import org.opennms.core.rpc.api.RpcClient;
import org.opennms.core.rpc.api.RpcClientFactory;
import org.opennms.core.rpc.api.RpcModule;
Expand Down Expand Up @@ -74,7 +76,13 @@ public void onComplete(Exchange exchange) {
}
@Override
public void onFailure(Exchange exchange) {
future.completeExceptionally(exchange.getException());
// Wrap timeout exceptions within a RequestTimedOutException
final ExchangeTimedOutException timeoutException = exchange.getException(ExchangeTimedOutException.class);
if (timeoutException != null) {
future.completeExceptionally(new RequestTimedOutException(exchange.getException()));
} else {
future.completeExceptionally(exchange.getException());
}
}
});
return future;
Expand Down
Expand Up @@ -84,16 +84,8 @@ public class EchoRpcBlueprintIT extends CamelBlueprintTest {
@Override
protected void addServicesOnStartup(Map<String, KeyValueHolder<Object, Dictionary>> services) {
services.put(MinionIdentity.class.getName(),
new KeyValueHolder<Object, Dictionary>(new MinionIdentity() {
@Override
public String getId() {
return "0";
}
@Override
public String getLocation() {
return REMOTE_LOCATION_NAME;
}
}, new Properties()));
new KeyValueHolder<Object, Dictionary>(new MockMinionIdentity(REMOTE_LOCATION_NAME),
new Properties()));

Properties props = new Properties();
props.setProperty("alias", "opennms.broker");
Expand Down
@@ -1,43 +1,67 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2016 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2016 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.core.rpc.camel;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SimpleRegistry;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.UnitOfWork;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opennms.core.rpc.api.RpcModule;
import org.opennms.core.rpc.api.RpcRequest;
import org.opennms.core.rpc.api.RpcResponse;
import org.opennms.core.rpc.api.RequestTimedOutException;
import org.opennms.core.rpc.echo.EchoClient;
import org.opennms.core.rpc.echo.EchoRequest;
import org.opennms.core.rpc.echo.EchoResponse;
import org.opennms.core.rpc.echo.EchoRpcModule;
import org.opennms.core.test.OpenNMSJUnit4ClassRunner;
import org.opennms.core.test.activemq.ActiveMQBroker;
import org.opennms.minion.core.api.MinionIdentity;
import org.opennms.netmgt.model.OnmsDistPoller;
import org.opennms.test.JUnitConfigurationEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.annotation.Repeat;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.support.DirtiesContextTestExecutionListener;

@RunWith(OpenNMSJUnit4ClassRunner.class)
@ContextConfiguration(locations={
Expand All @@ -49,6 +73,8 @@
"classpath:/META-INF/opennms/applicationContext-rpc-echo.xml"
})
@JUnitConfigurationEnvironment
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
@TestExecutionListeners({DirtiesContextTestExecutionListener.class})
public class EchoRpcIT {

private static final String REMOTE_LOCATION_NAME = "remote";
Expand All @@ -61,7 +87,7 @@ public class EchoRpcIT {

@Autowired
@Qualifier("queuingservice")
private Component queuingservice;
private ActiveMQComponent queuingservice;

@Autowired
private EchoClient echoClient;
Expand All @@ -83,17 +109,8 @@ public void canExecuteRpcViaAnotherLocation() throws Exception {
CamelContext context = new DefaultCamelContext(registry);
context.addComponent("queuingservice", queuingservice);

CamelRpcServerRouteManager routeManager = new CamelRpcServerRouteManager(context, new MinionIdentity() {
@Override
public String getId() {
return "0";
}

@Override
public String getLocation() {
return REMOTE_LOCATION_NAME;
}
});
CamelRpcServerRouteManager routeManager = new CamelRpcServerRouteManager(context,
new MockMinionIdentity(REMOTE_LOCATION_NAME));
routeManager.bind(echoRpcModule);

EchoRequest request = new EchoRequest("HELLO!!!");
Expand Down Expand Up @@ -164,4 +181,34 @@ public void checkZeroTimeout() throws Exception {

assertEquals(CamelRpcClientPreProcessor.CAMEL_JMS_REQUEST_TIMEOUT_DEFAULT, defaultExchange.getIn().getHeader(CamelRpcConstants.CAMEL_JMS_REQUEST_TIMEOUT_HEADER));
}

@Test(timeout=CamelRpcClientPreProcessor.CAMEL_JMS_REQUEST_TIMEOUT_DEFAULT * 4)
public void throwsRequestTimedOutExceptionOnTimeout() throws Exception {
assertNotEquals(REMOTE_LOCATION_NAME, identity.getLocation());
EchoRpcModule echoRpcModule = new EchoRpcModule();

SimpleRegistry registry = new SimpleRegistry();
CamelContext context = new DefaultCamelContext(registry);
context.getShutdownStrategy().setTimeout(5);
context.getShutdownStrategy().setTimeUnit(TimeUnit.SECONDS);
context.addComponent("queuingservice", queuingservice);

CamelRpcServerRouteManager routeManager = new CamelRpcServerRouteManager(context,
new MockMinionIdentity(REMOTE_LOCATION_NAME));
routeManager.bind(echoRpcModule);

EchoRequest request = new EchoRequest("HELLO!!!");
request.setLocation(REMOTE_LOCATION_NAME);
request.setDelay(CamelRpcClientPreProcessor.CAMEL_JMS_REQUEST_TIMEOUT_DEFAULT * 2);

try {
echoClient.execute(request).get();
fail("Did not get ExecutionException");
} catch (ExecutionException e) {
assertTrue("Cause is not of type RequestTimedOutException: " + ExceptionUtils.getStackTrace(e), e.getCause() instanceof RequestTimedOutException);
}

routeManager.unbind(echoRpcModule);
context.stop();
}
}
@@ -0,0 +1,50 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2016 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2016 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/

package org.opennms.core.rpc.camel;

import org.opennms.minion.core.api.MinionIdentity;

public class MockMinionIdentity implements MinionIdentity {

private final String location;

public MockMinionIdentity(String location) {
this.location = location;
}

@Override
public String getId() {
return "0";
}

@Override
public String getLocation() {
return location;
}
}
Expand Up @@ -43,10 +43,15 @@ public class EchoRequest implements RpcRequest {

@XmlAttribute(name="message")
private String message;

@XmlAttribute(name="location")
private String location;

@XmlAttribute(name="delay")
private Long delay;

private Long timeToLiveMs;

public EchoRequest() { }

public EchoRequest(String message) {
Expand All @@ -70,14 +75,26 @@ public String getLocation() {
return location;
}

public void setTimeToLiveMs(Long timeToLiveMs) {
this.timeToLiveMs = timeToLiveMs;
}

@Override
public Long getTimeToLiveMs() {
return null;
return timeToLiveMs;
}

public void setDelay(Long delay) {
this.delay = delay;
}

public Long getDelay() {
return delay;
}

@Override
public int hashCode() {
return Objects.hash(message);
return Objects.hash(message, delay);
}

@Override
Expand All @@ -89,11 +106,12 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
final EchoRequest other = (EchoRequest) obj;
return Objects.equals(this.message, other.message);
return Objects.equals(this.message, other.message) &&
Objects.equals(this.delay, other.delay);
}

@Override
public String toString() {
return String.format("EchoRequest[message=%s]", message);
return String.format("EchoRequest[message=%s, delay=%s]", message, delay);
}
}
Expand Up @@ -42,7 +42,22 @@ public EchoRpcModule() {

@Override
public CompletableFuture<EchoResponse> execute(EchoRequest request) {
return CompletableFuture.completedFuture(new EchoResponse(request.getMessage()));
final CompletableFuture<EchoResponse> future = new CompletableFuture<>();
new Thread(new Runnable() {
@Override
public void run() {
if (request.getDelay() != null) {
try {
Thread.sleep(request.getDelay());
} catch (InterruptedException e) {
future.completeExceptionally(e);
return;
}
}
future.complete(new EchoResponse(request.getMessage()));
}
}).start();
return future;
}

@Override
Expand Down
Expand Up @@ -49,10 +49,11 @@ public class ActiveMQBroker extends ExternalResource {
@Override
public void before() throws Exception {
m_temporaryDirectory = Files.createTempDirectory("activemq-data");
m_broker.setPersistent(false);
m_broker.setDataDirectory(m_temporaryDirectory.toString());
m_broker.start();
if (!m_broker.waitUntilStarted()) {
LOG.warn("ActiveMQ broker was not started or stopped unexpectedly");
throw new Exception("ActiveMQ broker was not started or stopped unexpectedly. Error: " + m_broker.getStartException());
}
};

Expand Down

0 comments on commit f6c8ad8

Please sign in to comment.