Skip to content

Commit

Permalink
https://issues.apache.org/jira/browse/AMQCPP-511
Browse files Browse the repository at this point in the history
  • Loading branch information
tabish121 committed Oct 18, 2013
1 parent e32e2ef commit 08425c7
Show file tree
Hide file tree
Showing 17 changed files with 515 additions and 0 deletions.
4 changes: 4 additions & 0 deletions activemq-cpp/src/main/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ cc_sources = \
activemq/transport/discovery/DiscoveryListener.cpp \
activemq/transport/discovery/DiscoveryTransport.cpp \
activemq/transport/discovery/DiscoveryTransportFactory.cpp \
activemq/transport/discovery/http/HttpDiscoveryAgent.cpp \
activemq/transport/discovery/http/HttpDiscoveryAgentFactory.cpp \
activemq/transport/failover/BackupTransport.cpp \
activemq/transport/failover/BackupTransportPool.cpp \
activemq/transport/failover/CloseTransportsTask.cpp \
Expand Down Expand Up @@ -822,6 +824,8 @@ h_sources = \
activemq/transport/discovery/DiscoveryListener.h \
activemq/transport/discovery/DiscoveryTransport.h \
activemq/transport/discovery/DiscoveryTransportFactory.h \
activemq/transport/discovery/http/HttpDiscoveryAgent.h \
activemq/transport/discovery/http/HttpDiscoveryAgentFactory.h \
activemq/transport/failover/BackupTransport.h \
activemq/transport/failover/BackupTransportPool.h \
activemq/transport/failover/CloseTransportsTask.h \
Expand Down
6 changes: 6 additions & 0 deletions activemq-cpp/src/main/activemq/library/ActiveMQCPP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include <activemq/transport/tcp/TcpTransportFactory.h>
#include <activemq/transport/tcp/SslTransportFactory.h>
#include <activemq/transport/failover/FailoverTransportFactory.h>
#include <activemq/transport/discovery/DiscoveryTransportFactory.h>

#include <activemq/transport/discovery/http/HttpDiscoveryAgentFactory.h>

using namespace activemq;
using namespace activemq::library;
Expand All @@ -40,6 +43,7 @@ using namespace activemq::transport::tcp;
using namespace activemq::transport::mock;
using namespace activemq::transport::failover;
using namespace activemq::transport::discovery;
using namespace activemq::transport::discovery::http;
using namespace activemq::wireformat;

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -108,8 +112,10 @@ void ActiveMQCPP::registerTransports() {
TransportRegistry::getInstance().registerFactory("ssl", new SslTransportFactory());
TransportRegistry::getInstance().registerFactory("mock", new MockTransportFactory());
TransportRegistry::getInstance().registerFactory("failover", new FailoverTransportFactory());
TransportRegistry::getInstance().registerFactory("discovery", new DiscoveryTransportFactory());

// Each discovery agent implemented in this library must be registered here.
DiscoveryAgentRegistry::initialize();

DiscoveryAgentRegistry::getInstance().registerFactory("http", new HttpDiscoveryAgentFactory);
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ DiscoveryListener* AbstractDiscoveryAgent::getDiscoveryListener() const {
return this->impl->listener;
}

////////////////////////////////////////////////////////////////////////////////
void AbstractDiscoveryAgent::setDiscoveryURI(const URI& discoveryURI) {
impl->discoveryUri = discoveryURI;
}

////////////////////////////////////////////////////////////////////////////////
URI AbstractDiscoveryAgent::getDiscoveryURI() const {
return impl->discoveryUri;
}

////////////////////////////////////////////////////////////////////////////////
void AbstractDiscoveryAgent::setServiceName(const std::string& name) {
impl->selfService = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ namespace discovery {
virtual void setDiscoveryListener(DiscoveryListener* listener);
virtual DiscoveryListener* getDiscoveryListener() const;

virtual void setDiscoveryURI(const decaf::net::URI& discoveryURI);
virtual decaf::net::URI getDiscoveryURI() const;

/**
* @returns true if this agent is currently started.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Pointer<DiscoveryAgent> AbstractDiscoveryAgentFactory::createAgent(const URI& ag

Pointer<AbstractDiscoveryAgent> agent = this->doCreateAgent();

agent->setDiscoveryURI(agentURI);
Properties options = URISupport::parseParameters(agentURI);
doConfigureAgent(agent, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ namespace discovery {

virtual ~DiscoveryAgent();

/**
* Sets the URI that was used to create this discovery agent. This URI can
* be used to configure the discovery agent.
*/
virtual void setDiscoveryURI(const decaf::net::URI& discoveryURI) = 0;

/**
* Sets the URI that was used to create this discovery agent. This URI can
* be used to configure the discovery agent.
*/
virtual decaf::net::URI getDiscoveryURI() const = 0;

/**
* Sets the discovery listener which will be notified on the add or remove of
* a discovered service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <activemq/transport/discovery/http/HttpDiscoveryAgent.h>

#include <decaf/lang/Long.h>
#include <decaf/net/URI.h>
#include <decaf/util/HashSet.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/net/SocketFactory.h>
#include <decaf/net/Socket.h>
#include <decaf/io/InputStream.h>
#include <decaf/io/OutputStream.h>
#include <decaf/io/DataInputStream.h>
#include <decaf/io/DataOutputStream.h>
#include <decaf/io/BufferedInputStream.h>
#include <decaf/io/BufferedOutputStream.h>

using namespace activemq;
using namespace activemq::util;
using namespace activemq::transport;
using namespace activemq::transport::discovery;
using namespace activemq::transport::discovery::http;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::net;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;

////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace transport {
namespace discovery {
namespace http {

enum UpdateState {
SUSPENDED,
RESUMING,
RESUMED
};

class HttpDiscoveryAgentImpl {
private:

HttpDiscoveryAgentImpl(const HttpDiscoveryAgentImpl&);
HttpDiscoveryAgentImpl& operator= (const HttpDiscoveryAgentImpl&);

public:

UpdateState updateState;
Mutex updateLock;
long long updateInterval;
URI registryUrl;

public:

HttpDiscoveryAgentImpl() : updateState(RESUMED),
updateLock(),
updateInterval(10 * 1000),
registryUrl() {
}

HashSet<std::string> doLookup() {

HashSet<std::string> result;
try {
return result;
} catch (Exception& e) {
std::cout << "Caught exception: " << e.getMessage() << std::endl;
}

return result;
}
};

}}}}

////////////////////////////////////////////////////////////////////////////////
HttpDiscoveryAgent::HttpDiscoveryAgent() : AbstractDiscoveryAgent(), impl(new HttpDiscoveryAgentImpl) {
}

////////////////////////////////////////////////////////////////////////////////
HttpDiscoveryAgent::~HttpDiscoveryAgent() {
try {
delete this->impl;
}
DECAF_CATCHALL_NOTHROW()
}

////////////////////////////////////////////////////////////////////////////////
std::string HttpDiscoveryAgent::toString() const {
return "HttpDiscoveryAgent";
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::suspend() {
synchronized(&impl->updateLock) {
impl->updateState = SUSPENDED;
}
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::resume() {
synchronized(&impl->updateLock) {
impl->updateState = RESUMING;
impl->updateLock.notify();
}
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::doStart() {

if (impl->registryUrl.toString().empty()) {
impl->registryUrl = getDiscoveryURI();
}

resume();
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::doStop() {
suspend();
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::doAdvertizeSelf() {

}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::setUpdateInterval(long long updateInterval) {
impl->updateInterval = updateInterval;
}

////////////////////////////////////////////////////////////////////////////////
long long HttpDiscoveryAgent::getUpdateInterval() const {
return impl->updateInterval;
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::setRegistryURL(const std::string& registryUrl) {
impl->registryUrl.create(registryUrl);
}

////////////////////////////////////////////////////////////////////////////////
std::string HttpDiscoveryAgent::getRegistryURL() const{
return impl->registryUrl.toString();
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::doDiscovery() {
try {
updateServices();
synchronized(&impl->updateLock) {
do {
if (impl->updateState == RESUMING) {
impl->updateState = RESUMED;
} else {
impl->updateLock.wait(impl->updateInterval);
}
} while (impl->updateState == SUSPENDED && isStarted());
}
} catch (InterruptedException& e) {
return;
}
}

////////////////////////////////////////////////////////////////////////////////
void HttpDiscoveryAgent::updateServices() {
DiscoveryListener* discoveryListener = getDiscoveryListener();
if (discoveryListener != NULL) {
HashSet<std::string> activeServices = impl->doLookup();
if (activeServices.isEmpty()) {
Pointer< Iterator<std::string> > discovered(activeServices.iterator());
while (discovered->hasNext()) {
std::string service = discovered->next();
processLiveService("", service);
}
}
}
}
Loading

0 comments on commit 08425c7

Please sign in to comment.