Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

https://issues.apache.org/jira/browse/AMQCPP-511

Implements most of the basic bits of a Discovery Agent in an Abstract
class so that the agents can be simpler drop ins.
  • Loading branch information...
commit f714910919e68cb27909c20dcb3b5bcd63ce0cac 1 parent d25147a
@tabish121 tabish121 authored
View
2  activemq-cpp/src/main/Makefile.am
@@ -155,6 +155,7 @@ cc_sources = \
activemq/transport/TransportFilter.cpp \
activemq/transport/TransportRegistry.cpp \
activemq/transport/correlator/ResponseCorrelator.cpp \
+ activemq/transport/discovery/AbstractDiscoveryAgent.cpp \
activemq/transport/discovery/DiscoveredBrokerData.cpp \
activemq/transport/discovery/DiscoveryAgent.cpp \
activemq/transport/discovery/DiscoveryAgentFactory.cpp \
@@ -811,6 +812,7 @@ h_sources = \
activemq/transport/TransportListener.h \
activemq/transport/TransportRegistry.h \
activemq/transport/correlator/ResponseCorrelator.h \
+ activemq/transport/discovery/AbstractDiscoveryAgent.h \
activemq/transport/discovery/DiscoveredBrokerData.h \
activemq/transport/discovery/DiscoveryAgent.h \
activemq/transport/discovery/DiscoveryAgentFactory.h \
View
503 activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.cpp
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/transport/discovery/AbstractDiscoveryAgent.h>
+
+#include <activemq/transport/discovery/DiscoveredBrokerData.h>
+#include <activemq/transport/discovery/DiscoveryListener.h>
+
+#include <decaf/net/URI.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/HashMap.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+
+using namespace activemq;
+using namespace activemq::commands;
+using namespace activemq::transport;
+using namespace activemq::transport::discovery;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+const int AbstractDiscoveryAgent::DEFAULT_INITIAL_RECONNECT_DELAY = 5000;
+const int AbstractDiscoveryAgent::DEFAULT_BACKOFF_MULTIPLIER = 2;
+const int AbstractDiscoveryAgent::DEFAULT_MAX_RECONNECT_DELAY = 30000;
+const int AbstractDiscoveryAgent::WORKER_KILL_TIME_SECONDS = 1000;
+const int AbstractDiscoveryAgent::HEARTBEAT_MISS_BEFORE_DEATH = 10;
+const int AbstractDiscoveryAgent::DEFAULT_KEEPALIVE_INTERVAL = 500;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace transport {
+namespace discovery {
+
+ class AbstractDiscoveryAgentImpl {
+ private:
+
+ AbstractDiscoveryAgentImpl(const AbstractDiscoveryAgentImpl&);
+ AbstractDiscoveryAgentImpl& operator=(const AbstractDiscoveryAgentImpl&);
+
+ public:
+
+ long long initialReconnectDelay;
+ long long maxReconnectDelay;
+ long long backOffMultiplier;
+ bool useExponentialBackOff;
+ int maxReconnectAttempts;
+ long long keepAliveInterval;
+
+ AtomicBoolean started;
+ Pointer<Thread> worker;
+ Pointer<ThreadPoolExecutor> executor;
+
+ HashMap<std::string, Pointer<DiscoveredBrokerData> > discoveredServices;
+ Mutex discoveredServicesLock;
+
+ URI discoveryUri;
+ std::string selfService;
+ std::string group;
+ DiscoveryListener* listener;
+ long long lastAdvertizeTime;
+ bool reportAdvertizeFailed;
+
+ public:
+
+ AbstractDiscoveryAgentImpl() : initialReconnectDelay(AbstractDiscoveryAgent::DEFAULT_INITIAL_RECONNECT_DELAY),
+ maxReconnectDelay(AbstractDiscoveryAgent::DEFAULT_MAX_RECONNECT_DELAY),
+ backOffMultiplier(AbstractDiscoveryAgent::DEFAULT_BACKOFF_MULTIPLIER),
+ useExponentialBackOff(false),
+ maxReconnectAttempts(0),
+ keepAliveInterval(AbstractDiscoveryAgent::DEFAULT_KEEPALIVE_INTERVAL),
+ started(),
+ worker(),
+ executor(),
+ discoveredServices(),
+ discoveredServicesLock(),
+ discoveryUri(),
+ selfService(),
+ group("default"),
+ listener(),
+ lastAdvertizeTime(0),
+ reportAdvertizeFailed(true)
+ {}
+
+ Executor& getExecutor() {
+ if (executor == NULL) {
+ synchronized(&discoveredServicesLock) {
+ if (executor == NULL) {
+ executor.reset(
+ new ThreadPoolExecutor(1, 1, 45, TimeUnit::SECONDS,
+ new LinkedBlockingQueue<Runnable*>()));
+ }
+ }
+ }
+ return *executor;
+ }
+
+ /**
+ * Returns true if this Broker has been marked as failed and it is now time to
+ * start a recovery attempt.
+ */
+ bool isTimeForRecovery(Pointer<DiscoveredBrokerData> service) {
+ synchronized(&discoveredServicesLock) {
+
+ if (!service->isFailed()) {
+ return false;
+ }
+
+ int maxReconnectAttempts = maxReconnectAttempts;
+
+ // Are we done trying to recover this guy?
+ if (maxReconnectAttempts > 0 && service->getFailureCount() > maxReconnectAttempts) {
+ return false;
+ }
+
+ // Is it not yet time?
+ if (System::currentTimeMillis() < service->getNextRecoveryTime()) {
+ return false;
+ }
+
+ service->setFailed(false);
+ return true;
+ }
+
+ return false;
+ }
+
+ void updateHeartBeat(Pointer<DiscoveredBrokerData> service) {
+ synchronized(&discoveredServicesLock) {
+
+ service->setLastHeartBeatTime(System::currentTimeMillis());
+
+ // Consider that the broker recovery has succeeded if it has not failed in 60 seconds.
+ if (!service->isFailed() && service->getFailureCount() > 0 &&
+ (service->getLastHeartBeatTime() - service->getNextRecoveryTime()) > TimeUnit::MINUTES.toSeconds(60)) {
+
+ service->setFailureCount(0);
+ service->setNextRecoveryTime(System::currentTimeMillis());
+ }
+ }
+ }
+
+ bool markFailed(Pointer<DiscoveredBrokerData> service) {
+ synchronized(&discoveredServicesLock) {
+
+ if (!service->isFailed()) {
+ service->setFailed(true);
+ service->setFailureCount(service->getFailureCount() + 1);
+
+ long
+ reconnectDelay = 0;
+ if (!useExponentialBackOff) {
+ reconnectDelay = initialReconnectDelay;
+ } else {
+ reconnectDelay = (long) Math::pow((double)backOffMultiplier, (double)service->getFailureCount());
+ reconnectDelay = Math::min(reconnectDelay, maxReconnectDelay);
+ }
+
+ service->setNextRecoveryTime(System::currentTimeMillis() + reconnectDelay);
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+
+ class ServiceAddedRunnable : public Runnable {
+ private:
+
+ AbstractDiscoveryAgent* agent;
+ Pointer<DiscoveredBrokerData> event;
+
+ public:
+
+ ServiceAddedRunnable(AbstractDiscoveryAgent* agent, Pointer<DiscoveredBrokerData> event) :
+ Runnable(), agent(agent), event(event) {
+ }
+ virtual ~ServiceAddedRunnable();
+
+ virtual void run() {
+ DiscoveryListener* listener = agent->getDiscoveryListener();
+ if (listener != NULL) {
+ listener->onServiceAdd(event.get());
+ }
+ }
+ };
+
+ class ServiceRemovedRunnable : public Runnable {
+ private:
+
+ AbstractDiscoveryAgent* agent;
+ Pointer<DiscoveredBrokerData> event;
+
+ public:
+
+ ServiceRemovedRunnable(AbstractDiscoveryAgent* agent, Pointer<DiscoveredBrokerData> event) :
+ Runnable(), agent(agent), event(event) {}
+ virtual ~ServiceRemovedRunnable();
+
+ virtual void run() {
+ DiscoveryListener* listener = agent->getDiscoveryListener();
+ if (listener != NULL) {
+ listener->onServiceRemove(event.get());
+ }
+ }
+ };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractDiscoveryAgent::AbstractDiscoveryAgent() : DiscoveryAgent(), impl(new AbstractDiscoveryAgentImpl) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractDiscoveryAgent::~AbstractDiscoveryAgent() {
+ try {
+ delete this->impl;
+ }
+ DECAF_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::start() {
+ if (impl->started.compareAndSet(false, true)) {
+ doStart();
+
+ if (impl->worker == NULL) {
+ impl->worker.reset(new Thread(this));
+ impl->worker->start();
+ }
+
+ doAdvertizeSelf();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::stop() {
+ // Changing the isStarted flag will signal the thread that it needs to shut down.
+ if (impl->started.compareAndSet(true, false)) {
+ doStop();
+
+ if (impl->worker == NULL) {
+ impl->worker->join(WORKER_KILL_TIME_SECONDS);
+
+ if (!impl->worker->isAlive()) {
+ impl->worker->interrupt();
+ }
+
+ impl->worker.reset(NULL);
+ }
+
+ impl->executor->shutdown();
+ impl->executor->awaitTermination(1, TimeUnit::MINUTES);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::run() {
+
+ Thread::currentThread()->setName("Discovery Agent Thread.");
+
+ while (impl->started.get()) {
+ doTimeKeepingServices();
+ try {
+ doDiscovery();
+ } catch (InterruptedException& ex) {
+ return;
+ } catch (Exception& ignore) {
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::registerService(const std::string& name) {
+ impl->selfService = name;
+ if (impl->started.get()) {
+ try {
+ doAdvertizeSelf();
+ } catch (Exception& e) {
+ // If a the advertise fails, chances are all subsequent sends will fail
+ // too.. No need to keep reporting the same error over and over.
+ if (impl->reportAdvertizeFailed) {
+ impl->reportAdvertizeFailed = false;
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::serviceFailed(const activemq::commands::DiscoveryEvent& event) {
+
+ Pointer<DiscoveredBrokerData> service;
+ synchronized(&impl->discoveredServicesLock) {
+ try {
+ service = impl->discoveredServices.get(event.getServiceName());
+ } catch (NoSuchElementException& ex) {}
+ }
+
+ if (service != NULL && impl->markFailed(service)) {
+ fireServiceRemovedEvent(service);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setDiscoveryListener(DiscoveryListener* listener) {
+ this->impl->listener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DiscoveryListener* AbstractDiscoveryAgent::getDiscoveryListener() const {
+ return this->impl->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setServiceName(const std::string& name) {
+ impl->selfService = name;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string AbstractDiscoveryAgent::getServiceName() const {
+ return impl->selfService;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setKeepAliveInterval(long long interval) {
+ impl->keepAliveInterval = interval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getKeepAliveInterval() const {
+ return impl->keepAliveInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setInitialReconnectDelay(long long initialReconnectDelay) {
+ impl->initialReconnectDelay = initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getInitialReconnectDelay() const {
+ return impl->initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setMaxReconnectAttempts(int maxReconnectAttempts) {
+ impl->maxReconnectAttempts = maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int AbstractDiscoveryAgent::getMaxReconnectAttempts() const {
+ return impl->maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setMaxReconnectDelay(long long maxReconnectDelay) {
+ impl->maxReconnectDelay = maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getMaxReconnectDelay() const {
+ return impl->maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setUseExponentialBackOff(bool useExponentialBackOff) {
+ impl->useExponentialBackOff = useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool AbstractDiscoveryAgent::isUseExponentialBackOff() const {
+ return impl->useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setGroup(const std::string& group) {
+ impl->group = group;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string AbstractDiscoveryAgent::getGroup() const {
+ return impl->group;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::fireServiceRemovedEvent(Pointer<DiscoveredBrokerData> event) {
+ if (impl->listener != NULL && impl->started.get()) {
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ impl->getExecutor().execute(new ServiceRemovedRunnable(this, event));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::fireServiceAddedEvent(Pointer<DiscoveredBrokerData> event) {
+ if (impl->listener != NULL && impl->started.get()) {
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ impl->getExecutor().execute(new ServiceAddedRunnable(this, event));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::processLiveService(const std::string& brokerName, const std::string& service) {
+
+ if (getServiceName().empty() || service != getServiceName()) {
+ Pointer<DiscoveredBrokerData> remoteBroker;
+ synchronized(&impl->discoveredServicesLock) {
+ try {
+ remoteBroker = impl->discoveredServices.get(service);
+ } catch (NoSuchElementException& ignored) {
+ }
+ }
+
+ if (remoteBroker == NULL) {
+ remoteBroker.reset(new DiscoveredBrokerData(brokerName, service));
+ impl->discoveredServices.put(service, remoteBroker);
+ fireServiceAddedEvent(remoteBroker);
+ doAdvertizeSelf();
+ } else {
+ impl->updateHeartBeat(remoteBroker);
+ if (impl->isTimeForRecovery(remoteBroker)) {
+ fireServiceAddedEvent(remoteBroker);
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::processDeadService(const std::string& service) {
+
+ if (service != getServiceName()) {
+
+ Pointer<DiscoveredBrokerData> remoteBroker;
+ synchronized(&impl->discoveredServicesLock) {
+ try {
+ remoteBroker = impl->discoveredServices.get(service);
+ } catch (NoSuchElementException& ignored) {
+ }
+ }
+
+ if (remoteBroker != NULL && !remoteBroker->isFailed()) {
+ fireServiceRemovedEvent(remoteBroker);
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::doTimeKeepingServices() {
+
+ if (impl->started.get()) {
+ long long currentTime = System::currentTimeMillis();
+ if (currentTime < impl->lastAdvertizeTime ||
+ ((currentTime - impl->keepAliveInterval) > impl->lastAdvertizeTime)) {
+
+ doAdvertizeSelf();
+ impl->lastAdvertizeTime = currentTime;
+ }
+ doExpireOldServices();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::doExpireOldServices() {
+ long long expireTime = System::currentTimeMillis() -
+ (impl->keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+
+ std::vector< Pointer<DiscoveredBrokerData> > services;
+ synchronized(&impl->discoveredServicesLock) {
+ services = impl->discoveredServices.values().toArray();
+ }
+
+ std::vector< Pointer<DiscoveredBrokerData> >::iterator iter = services.begin();
+ for (; iter != services.end(); ++iter) {
+ Pointer<DiscoveredBrokerData> service = *iter;
+ if (service->getLastHeartBeatTime() < expireTime) {
+ processDeadService(service->getServiceName());
+ }
+ }
+}
View
178 activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.h
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_
+#define _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/transport/discovery/DiscoveryAgent.h>
+#include <activemq/transport/discovery/DiscoveredBrokerData.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace transport {
+namespace discovery {
+
+ class AbstractDiscoveryAgentImpl;
+
+ /**
+ * Abstract base class that provides all the basic implementation needed to create
+ * a DiscoveryAgent instance without needing to implement a lot of boilerplate code.
+ *
+ * @since 3.9.0
+ */
+ class AMQCPP_API AbstractDiscoveryAgent : public DiscoveryAgent, public decaf::lang::Runnable {
+ private:
+
+ AbstractDiscoveryAgentImpl* impl;
+
+ public:
+
+ static const int DEFAULT_INITIAL_RECONNECT_DELAY;
+ static const int DEFAULT_BACKOFF_MULTIPLIER;
+ static const int DEFAULT_MAX_RECONNECT_DELAY;
+ static const int WORKER_KILL_TIME_SECONDS;
+ static const int HEARTBEAT_MISS_BEFORE_DEATH;
+ static const int DEFAULT_KEEPALIVE_INTERVAL;
+
+ private:
+
+ AbstractDiscoveryAgent(const AbstractDiscoveryAgent&);
+ AbstractDiscoveryAgent& operator= (const AbstractDiscoveryAgent&);
+
+ public:
+
+ AbstractDiscoveryAgent();
+ virtual ~AbstractDiscoveryAgent();
+
+ virtual void start();
+ virtual void stop();
+
+ virtual void registerService(const std::string& name);
+ virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event);
+
+ virtual void setDiscoveryListener(DiscoveryListener* listener);
+ virtual DiscoveryListener* getDiscoveryListener() const;
+
+ /**
+ * Sets the service that is publish by this agent if it supports publishing.
+ *
+ * @param name
+ * The service name to publish, typically the URI.
+ */
+ void setServiceName(const std::string& name);
+
+ /**
+ * Gets the configured service to publish, not all agents can publish so this value
+ * may not mean that an actual service advertisement is ever done.
+ *
+ * @returns the configured service to publish.
+ */
+ std::string getServiceName() const;
+
+ /**
+ * Sets the keep alive interval used to control how long an service that has not been
+ * seen is kept in the list of discovered services before being idle to long. Also this
+ * value controls how often this service will advertise itself if it supports that.
+ *
+ * @param interval
+ * Time in milliseconds for the keep alive interval.
+ */
+ void setKeepAliveInterval(long long interval);
+
+ /**
+ * Gets the keep alive interval used to control how long an service that has not been
+ * seen is kept in the list of discovered services before being idle to long. Also this
+ * value controls how often this service will advertise itself if it supports that.
+ *
+ * @returns Time in milliseconds for the keep alive interval.
+ */
+ long long getKeepAliveInterval() const;
+
+ /**
+ * Sets the agents reconnect backoff multiplier.
+ *
+ * @param multiplier
+ * The back multiplier to use when calculating the next recovery time.
+ */
+ void getBackOffMultiplier(long long multiplier);
+
+ /**
+ * Gets the configured backoff multiplier for calculating the next recovery time.
+ *
+ * @returns the configured backoff multiplier for calculating the next recovery time.
+ */
+ long long getBackOffMultiplier() const;
+
+ void setInitialReconnectDelay(long long initialReconnectDelay);
+
+ long long getInitialReconnectDelay() const;
+
+ void setMaxReconnectAttempts(int maxReconnectAttempts);
+
+ int getMaxReconnectAttempts() const;
+
+ void setMaxReconnectDelay(long long maxReconnectDelay);
+
+ long long getMaxReconnectDelay() const;
+
+ void setUseExponentialBackOff(bool useExponentialBackOff);
+
+ bool isUseExponentialBackOff() const;
+
+ void setGroup(const std::string& group);
+
+ std::string getGroup() const;
+
+ protected:
+
+ /**
+ * Default implementation of the DiscoveryAgent's background worker thread processing.
+ */
+ virtual void run();
+
+ virtual void processLiveService(const std::string& brokerName, const std::string& service);
+ virtual void processDeadService(const std::string& service);
+
+ virtual void fireServiceAddedEvent(decaf::lang::Pointer<DiscoveredBrokerData> event);
+ virtual void fireServiceRemovedEvent(decaf::lang::Pointer<DiscoveredBrokerData> event);
+
+ protected:
+
+ /**
+ * The real agent will implement this method to perform any necessary resource allocation
+ * prior to the completion of the start call.
+ */
+ virtual void doStart() = 0;
+
+ virtual void doStop() = 0;
+ virtual void doAdvertizeSelf() = 0;
+ virtual void doDiscovery() = 0;
+
+ private:
+
+ void doExpireOldServices();
+ void doTimeKeepingServices();
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_ */
View
12 activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp
@@ -31,5 +31,17 @@ DiscoveredBrokerData::DiscoveredBrokerData() : DiscoveryEvent(),
}
////////////////////////////////////////////////////////////////////////////////
+DiscoveredBrokerData::DiscoveredBrokerData(const std::string& brokerName, const std::string& service) :
+ DiscoveryEvent(),
+ lastHeartBeatTime(0),
+ nextRecoveryTime(0),
+ failureCount(0),
+ failed(false) {
+
+ setBrokerName(brokerName);
+ setServiceName(service);
+}
+
+////////////////////////////////////////////////////////////////////////////////
DiscoveredBrokerData::~DiscoveredBrokerData() {
}
View
5 activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h
@@ -18,6 +18,7 @@
#ifndef _ACTIVEMQ_TRANSPORT_DISCOVERY_DISCOVEREDBROKERDATA_H_
#define _ACTIVEMQ_TRANSPORT_DISCOVERY_DISCOVEREDBROKERDATA_H_
+#include <activemq/util/Config.h>
#include <activemq/commands/DiscoveryEvent.h>
namespace activemq {
@@ -30,7 +31,7 @@ namespace discovery {
*
* @since 3.9.0
*/
- class DiscoveredBrokerData : public activemq::commands::DiscoveryEvent {
+ class AMQCPP_API DiscoveredBrokerData : public activemq::commands::DiscoveryEvent {
private:
long long lastHeartBeatTime;
@@ -41,6 +42,8 @@ namespace discovery {
public:
DiscoveredBrokerData();
+ DiscoveredBrokerData(const std::string& brokerName, const std::string& service);
+
virtual ~DiscoveredBrokerData();
/**
View
7 activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h
@@ -69,6 +69,13 @@ namespace discovery {
*/
virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) = 0;
+ /**
+ * Returns a descriptive string that represents this discovery agent.
+ *
+ * @return a string that descibes this discovery agent.
+ */
+ virtual std::string toString() const = 0;
+
};
}}}
View
2  activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h
@@ -35,7 +35,7 @@ namespace discovery {
*
* @since 3.9.0
*/
- class DiscoveryAgentFactory {
+ class AMQCPP_API DiscoveryAgentFactory {
public:
virtual ~DiscoveryAgentFactory();
View
4 activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h
@@ -37,7 +37,7 @@ namespace discovery {
* @param event
* A DiscoveryEvent that contains information on the newly discovered service.
*/
- virtual void onServiceAdd(const activemq::commands::DiscoveryEvent& event) = 0;
+ virtual void onServiceAdd(const activemq::commands::DiscoveryEvent* event) = 0;
/**
* Called when an discovery agent determines that a service is no longer available.
@@ -45,7 +45,7 @@ namespace discovery {
* @param event
* A DiscoveryEvent that contains information on the removed service.
*/
- virtual void onServiceRemove(const activemq::commands::DiscoveryEvent& event) = 0;
+ virtual void onServiceRemove(const activemq::commands::DiscoveryEvent* event) = 0;
};
View
10 activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp
@@ -170,14 +170,14 @@ Properties DiscoveryTransport::getParameters() const {
}
////////////////////////////////////////////////////////////////////////////////
-void DiscoveryTransport::onServiceAdd(const DiscoveryEvent& event) {
- std::string url = event.getServiceName();
+void DiscoveryTransport::onServiceAdd(const DiscoveryEvent* event) {
+ std::string url = event->getServiceName();
if (!url.empty()) {
try {
URI uri(url);
uri = URISupport::applyParameters(uri, this->impl->parameters, DISCOVERED_OPTION_PREFIX);
synchronized(&this->impl->lock) {
- this->impl->serviceURIs.put(event.getServiceName(), uri);
+ this->impl->serviceURIs.put(event->getServiceName(), uri);
}
LinkedList<URI> uris;
uris.add(uri);
@@ -188,11 +188,11 @@ void DiscoveryTransport::onServiceAdd(const DiscoveryEvent& event) {
}
////////////////////////////////////////////////////////////////////////////////
-void DiscoveryTransport::onServiceRemove(const DiscoveryEvent& event) {
+void DiscoveryTransport::onServiceRemove(const DiscoveryEvent* event) {
try {
URI uri;
synchronized(&this->impl->lock) {
- uri = this->impl->serviceURIs.get(event.getServiceName());
+ uri = this->impl->serviceURIs.get(event->getServiceName());
}
LinkedList<URI> uris;
uris.add(uri);
View
4 activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h
@@ -88,9 +88,9 @@ namespace discovery {
public:
- virtual void onServiceAdd(const activemq::commands::DiscoveryEvent& event);
+ virtual void onServiceAdd(const activemq::commands::DiscoveryEvent* event);
- virtual void onServiceRemove(const activemq::commands::DiscoveryEvent& event);
+ virtual void onServiceRemove(const activemq::commands::DiscoveryEvent* event);
virtual void transportInterrupted();
View
1  activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
@@ -43,6 +43,7 @@ namespace {
virtual void setDiscoveryListener(DiscoveryListener* listener) {}
virtual void registerService(const std::string& name) {}
virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) {}
+ virtual std::string toString() const { return "MockDiscoveryAgent"; }
};
class MockDiscoveryAgentFactory : public DiscoveryAgentFactory {
View
1  activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
@@ -47,6 +47,7 @@ namespace {
virtual void setDiscoveryListener(DiscoveryListener* listener) {}
virtual void registerService(const std::string& name) {}
virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) {}
+ virtual std::string toString() const { return "MockDiscoveryAgent"; }
};
class MockDiscoveryAgentFactory : public DiscoveryAgentFactory {
Please sign in to comment.
Something went wrong with that request. Please try again.