Permalink
Browse files

Merge pull request #2107 from rubenk/remotebackend-fixes-for-3.4.2

Remotebackend fixes for 3.4.2
  • Loading branch information...
2 parents 079d797 + 5b58a44 commit cefcf9f3a3418a0ad26a429bb4975715e607e4cd @Habbie Habbie committed Jan 22, 2015
View
@@ -8,10 +8,10 @@ before_script:
- sudo /sbin/ip addr add 1.2.3.4/32 dev lo
- sudo rm /etc/apt/sources.list.d/travis_ci_zeromq3-source.list
- sudo apt-get update
- - sudo apt-get install --no-install-recommends libboost-all-dev libtolua-dev bc libcdb-dev libnet-dns-perl unbound-host ldnsutils dnsutils bind9utils libtool libcdb-dev xmlto links asciidoc ruby-json ruby-sqlite3 rubygems libcurl4-openssl-dev ruby1.9.1 socat time libzmq1 libzmq-dev pkg-config daemontools authbind liblua5.1-posix1 libopendbx1-dev libopendbx1-sqlite3 python-virtualenv libldap2-dev softhsm libp11-kit-dev p11-kit moreutils libgeoip-dev geoip-database
+ - sudo apt-get install --no-install-recommends libboost-all-dev libtolua-dev bc libcdb-dev libnet-dns-perl unbound-host ldnsutils dnsutils bind9utils libtool libcdb-dev xmlto links asciidoc ruby-json ruby-sqlite3 rubygems libcurl4-openssl-dev ruby1.9.1 socat time pkg-config daemontools authbind liblua5.1-posix1 libopendbx1-dev libopendbx1-sqlite3 python-virtualenv libldap2-dev softhsm libp11-kit-dev p11-kit moreutils libgeoip-dev geoip-database
- sudo sh -c 'sed s/precise/trusty/g /etc/apt/sources.list > /etc/apt/sources.list.d/trusty.list'
- sudo apt-get update
- - sudo apt-get install liblmdb0 liblmdb-dev lmdb-utils libyaml-cpp-dev
+ - sudo apt-get install liblmdb0 liblmdb-dev lmdb-utils libyaml-cpp-dev libzmq3-dev
- sudo update-alternatives --set ruby /usr/bin/ruby1.9.1
- sudo touch /etc/authbind/byport/53
- sudo chmod 755 /etc/authbind/byport/53
@@ -18,9 +18,21 @@ AC_DEFUN([PDNS_ENABLE_REMOTEBACKEND_ZEROMQ],[
AC_DEFINE([HAVE_LIBZMQ], [1], [Define to 1 if you have libzmq])
AC_DEFINE([REMOTEBACKEND_ZEROMQ], [1], [Define to 1 if you have the ZeroMQ connector])
REMOTEBACKEND_ZEROMQ=yes
+
],
[AC_MSG_ERROR([Could not find libzmq])]
)]
+
+ old_CXXFLAGS="$CXXFLAGS"
+ old_LDFLAGS="$LDFLAGS"
+ CXXFLAGS="$CFLAGS $LIBZMQ_CFLAGS"
+ LDFLAGS="$LDFLAGS $LIBZMQ_LIBS"
+ AC_CHECK_LIB([zmq], [zmq_msg_send],
+ [
+ AC_DEFINE([HAVE_ZMQ_MSG_SEND], [1], [Define to 1 if the ZeroMQ 3.x or greater API is available])
+ ])
+ CXXFLAGS="$old_CXXFLAGS"
+ LDFLAGS="$old_LDFLAGS"
)
])
@@ -1,5 +1,6 @@
source "https://rubygems.org"
+gem "json"
gem "webrick"
gem "zeromqrb"
gem "sqlite3"
@@ -1,18 +1,22 @@
GEM
remote: https://rubygems.org/
specs:
- ffi (1.9.3)
- ffi-rzmq (1.0.3)
- ffi
- sqlite3 (1.3.8)
+ ffi (1.9.6)
+ ffi-rzmq (2.0.1)
+ ffi-rzmq-core (>= 1.0.1)
+ ffi-rzmq-core (1.0.3)
+ ffi (~> 1.9)
+ json (1.8.1)
+ sqlite3 (1.3.9)
webrick (1.3.1)
- zeromqrb (0.1.1)
- ffi-rzmq (~> 1.0)
+ zeromqrb (0.1.3)
+ ffi-rzmq
PLATFORMS
ruby
DEPENDENCIES
+ json
sqlite3
webrick
zeromqrb
@@ -6,7 +6,7 @@
#include <sstream>
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
-#include "polarssl/ssl.h"
+#include "pdns/lock.hh"
#ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108
@@ -22,6 +22,7 @@ HTTPConnector::HTTPConnector(std::map<std::string,std::string> options) {
this->timeout = 2;
this->d_post = false;
this->d_post_json = false;
+ this->d_socket = NULL;
if (options.find("timeout") != options.end()) {
this->timeout = boost::lexical_cast<int>(options.find("timeout")->second)/1000;
@@ -38,11 +39,11 @@ HTTPConnector::HTTPConnector(std::map<std::string,std::string> options) {
this->d_post_json = true;
}
}
- if (options.find("capath") != options.end()) this->d_capath = options.find("capath")->second;
- if (options.find("cafile") != options.end()) this->d_cafile = options.find("cafile")->second;
}
HTTPConnector::~HTTPConnector() {
+ if (d_socket != NULL)
+ delete d_socket;
}
// converts json value into string
@@ -289,7 +290,7 @@ void HTTPConnector::post_requestbuilder(const rapidjson::Document &input, YaHTTP
}
int HTTPConnector::send_message(const rapidjson::Document &input) {
- int rv,ec;
+ int rv,ec,fd;
std::vector<std::string> members;
std::string method;
@@ -304,34 +305,65 @@ int HTTPConnector::send_message(const rapidjson::Document &input) {
restful_requestbuilder(input["method"].GetString(), input["parameters"], req);
rv = -1;
- req.headers["connection"] = "close"; // make sure the other ends knows we are not going to hang around
+ req.headers["connection"] = "Keep-Alive"; // see if we can streamline requests (not needed, strictly speaking)
out << req;
+ // try sending with current socket, if it fails retry with new socket
+ if (this->d_socket != NULL) {
+ fd = this->d_socket->getHandle();
+ // there should be no data waiting
+ if (waitForRWData(fd, true, 0, 1000) < 1) {
+ try {
+ d_socket->writenWithTimeout(out.str().c_str(), out.str().size(), timeout);
+ rv = 1;
+ } catch (NetworkError& ne) {
+ L<<Logger::Error<<"While writing to HTTP endpoint "<<d_addr.toStringWithPort()<<": "<<ne.what()<<std::endl;
+ } catch (...) {
+ L<<Logger::Error<<"While writing to HTTP endpoint "<<d_addr.toStringWithPort()<<": exception caught"<<std::endl;
+ }
+ }
+ }
+
+ if (rv == 1) return rv;
+
+ delete this->d_socket;
+ this->d_socket = NULL;
+
if (req.url.protocol == "unix") {
// connect using unix socket
} else {
// connect using tcp
- struct addrinfo *gAddr, *gAddrPtr;
+ struct addrinfo *gAddr, *gAddrPtr, hints;
std::string sPort = boost::lexical_cast<std::string>(req.url.port);
- if ((ec = getaddrinfo(req.url.host.c_str(), sPort.c_str(), NULL, &gAddr)) == 0) {
+ memset(&hints,0,sizeof hints);
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_flags = AI_ADDRCONFIG;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 6; // tcp
+ if ((ec = getaddrinfo(req.url.host.c_str(), sPort.c_str(), &hints, &gAddr)) == 0) {
// try to connect to each address.
gAddrPtr = gAddr;
+
while(gAddrPtr) {
- d_socket = new Socket(gAddrPtr->ai_family, gAddrPtr->ai_socktype, gAddrPtr->ai_protocol);
try {
- ComboAddress addr = *reinterpret_cast<ComboAddress*>(gAddrPtr->ai_addr);
- d_socket->connect(addr);
+ d_socket = new Socket(gAddrPtr->ai_family, gAddrPtr->ai_socktype, gAddrPtr->ai_protocol);
+ d_addr.setSockaddr(gAddrPtr->ai_addr, gAddrPtr->ai_addrlen);
+ d_socket->connect(d_addr);
d_socket->setNonBlocking();
d_socket->writenWithTimeout(out.str().c_str(), out.str().size(), timeout);
rv = 1;
} catch (NetworkError& ne) {
- L<<Logger::Error<<"While writing to HTTP endpoint: "<<ne.what()<<std::endl;
+ L<<Logger::Error<<"While writing to HTTP endpoint "<<d_addr.toStringWithPort()<<": "<<ne.what()<<std::endl;
+ } catch (...) {
+ L<<Logger::Error<<"While writing to HTTP endpoint "<<d_addr.toStringWithPort()<<": exception caught"<<std::endl;
}
+
if (rv > -1) break;
delete d_socket;
d_socket = NULL;
gAddrPtr = gAddrPtr->ai_next;
+
}
freeaddrinfo(gAddr);
} else {
@@ -349,18 +381,38 @@ int HTTPConnector::recv_message(rapidjson::Document &output) {
if (d_socket == NULL ) return -1; // cannot receive :(
char buffer[4096];
int rd = -1;
+ bool fail = false;
+ time_t t0;
arl.initialize(&resp);
- while(arl.ready() == false) {
- rd = d_socket->readWithTimeout(buffer, sizeof(buffer), timeout);
- if (rd<0) {
- delete d_socket;
- d_socket = NULL;
- return -1;
- }
- buffer[rd] = 0;
- arl.feed(std::string(buffer, rd));
+ try {
+ t0 = time((time_t*)NULL);
+ while(arl.ready() == false && (labs(time((time_t*)NULL) - t0) <= timeout/1000)) {
+ rd = d_socket->readWithTimeout(buffer, sizeof(buffer), timeout);
+ if (rd==0)
+ throw NetworkError("EOF while reading");
+ if (rd<0)
+ throw NetworkError(std::string(strerror(rd)));
+ buffer[rd] = 0;
+ arl.feed(std::string(buffer, rd));
+ }
+ // timeout occured.
+ if (arl.ready() == false)
+ throw NetworkError("timeout");
+ } catch (NetworkError &ne) {
+ L<<Logger::Error<<"While reading from HTTP endpoint "<<d_addr.toStringWithPort()<<": "<<ne.what()<<std::endl;
+ delete d_socket;
+ d_socket = NULL;
+ fail = true;
+ } catch (...) {
+ L<<Logger::Error<<"While reading from HTTP endpoint "<<d_addr.toStringWithPort()<<": exception caught"<<std::endl;
+ delete d_socket;
+ fail = true;
+ }
+
+ if (fail) {
+ return -1;
}
arl.finalize();
@@ -380,8 +432,5 @@ int HTTPConnector::recv_message(rapidjson::Document &output) {
else
rv = -1;
- delete d_socket;
- d_socket = NULL;
-
return rv;
}
@@ -1,5 +1,6 @@
-source 'https://rubygems.org'
+source "https://rubygems.org"
-gem 'webrick'
-gem 'sqlite3'
-gem 'zeromqrb'
+gem "json"
+gem "webrick"
+gem "zeromqrb"
+gem "sqlite3"
@@ -1,18 +1,22 @@
GEM
remote: https://rubygems.org/
specs:
- ffi (1.9.3)
- ffi-rzmq (1.0.3)
- ffi
- sqlite3 (1.3.8)
+ ffi (1.9.6)
+ ffi-rzmq (2.0.1)
+ ffi-rzmq-core (>= 1.0.1)
+ ffi-rzmq-core (1.0.3)
+ ffi (~> 1.9)
+ json (1.8.1)
+ sqlite3 (1.3.9)
webrick (1.3.1)
- zeromqrb (0.1.1)
- ffi-rzmq (~> 1.0)
+ zeromqrb (0.1.3)
+ ffi-rzmq
PLATFORMS
ruby
DEPENDENCIES
+ json
sqlite3
webrick
zeromqrb
@@ -14,7 +14,7 @@
begin
context = ZeroMQ::Context.new
socket = context.socket ZMQ::REP
- socket.bind("ipc:///tmp/pdns.0")
+ socket.bind("ipc:///tmp/pdns.0") or raise "Cannot bind to IPC socket"
while(true) do
line = ""
@@ -18,7 +18,13 @@
#include "sstuff.hh"
#ifdef REMOTEBACKEND_ZEROMQ
-#include <zmq.hpp>
+#include <zmq.h>
+
+// If the available ZeroMQ library version is < 2.x, create macros for the zmq_msg_send/recv functions
+#ifndef HAVE_ZMQ_MSG_SEND
+#define zmq_msg_send(msg, socket, flags) zmq_send(socket, msg, flags)
+#define zmq_msg_recv(msg, socket, flags) zmq_recv(socket, msg, flags)
+#endif
#endif
#define JSON_GET(obj,val,def) (obj.HasMember(val)?obj["" val ""]:def)
#define JSON_ADD_MEMBER(obj, name, val, alloc) { rapidjson::Value __xval; __xval = val; obj.AddMember(name, __xval, alloc); }
@@ -75,6 +81,7 @@ class HTTPConnector: public Connector {
void post_requestbuilder(const rapidjson::Document &input, YaHTTP::Request& req);
void addUrlComponent(const rapidjson::Value &parameters, const char *element, std::stringstream& ss);
Socket* d_socket;
+ ComboAddress d_addr;
};
#ifdef REMOTEBACKEND_ZEROMQ
@@ -90,8 +97,8 @@ class ZeroMQConnector: public Connector {
int d_timeout;
int d_timespent;
std::map<std::string,std::string> d_options;
- zmq::context_t d_ctx;
- zmq::socket_t d_sock;
+ void *d_ctx;
+ void *d_sock;
};
#endif
@@ -49,7 +49,7 @@ struct RemotebackendSetup {
new RemoteLoader();
BackendMakers().launch("remote");
// then get us a instance of it
- ::arg().set("remote-connection-string")="zeromq:endpoint=tcp://127.0.0.1:43622";
+ ::arg().set("remote-connection-string")="zeromq:endpoint=ipc:///tmp/remotebackend.0";
::arg().set("remote-dnssec")="yes";
be = BackendMakers().all()[0];
// load few record types to help out
@@ -273,7 +273,7 @@ BOOST_AUTO_TEST_CASE(test_method_feedEnts3) {
BOOST_TEST_MESSAGE("Testing feedEnts3 method");
be->startTransaction("example.com",2);
map<string, bool> nonterm = boost::assign::map_list_of("_udp", true)("_sip._udp", true);
- BOOST_CHECK(be->feedEnts3(2, "example.com", nonterm, 1, "\xaa\xbb\xcc\xdd", 0));
+ BOOST_CHECK(be->feedEnts3(2, "example.com", nonterm, 1, "\u00aa\u00bb\u00cc\u00dd", 0));
be->commitTransaction();
}
@@ -18,7 +18,7 @@
begin
context = ZeroMQ::Context.new
socket = context.socket ZMQ::REP
- socket.bind("tcp://127.0.0.1:43622")
+ socket.bind("ipc:///tmp/remotebackend.0")
print "[#{Time.now.to_s}] ZeroMQ unit test responder running\n"
@@ -43,10 +43,10 @@
else
res, log = h.send(method)
end
- socket.send_string ({:result => res, :log => log}).to_json, 0
+ socket.send_string ({:result => res, :log => log}).to_json + "\n" , 0
f.puts "#{Time.now.to_f} [zmq]: #{({:result => res, :log => log}).to_json}"
rescue JSON::ParserError
- socket.send_string ({:result => false, :log => "Cannot parse input #{line}"}).to_json
+ socket.send_string ({:result => false, :log => "Cannot parse input #{line}"}).to_json + "\n";
f.puts "#{Time.now.to_f} [zmq]: #{({:result => false, :log => "Cannot parse input #{line}"}).to_json}"
next
end
Oops, something went wrong.

0 comments on commit cefcf9f

Please sign in to comment.