Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dnsdist: Don't apply QPS to backend server on cache hits #9999

Merged
merged 1 commit into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pdns/dnsdist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders&
addXPF(dq, selectedBackend->xpfRRCode);
}

selectedBackend->queries++;
selectedBackend->incQueriesCount();
return ProcessQueryResult::PassToBackend;
}
catch (const std::exception& e){
Expand Down
65 changes: 42 additions & 23 deletions pdns/dnsdist.hh
Original file line number Diff line number Diff line change
Expand Up @@ -446,25 +446,40 @@ public:
}

bool check(unsigned int rate, unsigned int burst) const // this is not quite fair
{
if (checkOnly(rate, burst)) {
addHit();
return true;
}

return false;
}

bool checkOnly(unsigned int rate, unsigned int burst) const // this is not quite fair
{
auto delta = d_prev.udiffAndSet();

if(delta > 0.0) // time, frequently, does go backwards..
if (delta > 0.0) { // time, frequently, does go backwards..
d_tokens += 1.0 * rate * (delta/1000000.0);
}

if(d_tokens > burst) {
if (d_tokens > burst) {
d_tokens = burst;
}

bool ret=false;
if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
ret=true;
--d_tokens;
bool ret = false;
if (d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
ret = true;
}

return ret;
}

virtual void addHit() const
{
--d_tokens;
}

bool seenSince(const struct timespec& cutOff) const
{
return cutOff < d_prev.d_start;
Expand Down Expand Up @@ -492,35 +507,32 @@ public:
return d_passthrough ? 0 : d_rate;
}

int getPassed() const
bool check() const // this is not quite fair
{
return d_passed;
}
if (d_passthrough) {
return true;
}

int getBlocked() const
{
return d_blocked;
return BasicQPSLimiter::check(d_rate, d_burst);
}

bool check() const // this is not quite fair
bool checkOnly() const
{
if (d_passthrough) {
return true;
}

bool ret = BasicQPSLimiter::check(d_rate, d_burst);
if (ret) {
d_passed++;
}
else {
d_blocked++;
}
return BasicQPSLimiter::checkOnly(d_rate, d_burst);
}

return ret;
void addHit() const override
{
if (!d_passthrough) {
--d_tokens;
}
}

private:
mutable unsigned int d_passed{0};
mutable unsigned int d_blocked{0};
unsigned int d_rate;
unsigned int d_burst;
bool d_passthrough{true};
Expand Down Expand Up @@ -994,6 +1006,13 @@ struct DownstreamState
tcpAvgQueriesPerConnection = (99.0 * tcpAvgQueriesPerConnection / 100.0) + (nbQueries / 100.0);
tcpAvgConnectionDuration = (99.0 * tcpAvgConnectionDuration / 100.0) + (durationMs / 100.0);
}

void incQueriesCount()
{
++queries;
qps.addHit();
}

private:
std::string name;
std::string nameWithAddr;
Expand Down
3 changes: 2 additions & 1 deletion pdns/dnsdistdist/dnsdist-lbpolicies.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ shared_ptr<DownstreamState> leastOutstanding(const ServerPolicy::NumberedServerV
shared_ptr<DownstreamState> firstAvailable(const ServerPolicy::NumberedServerVector& servers, const DNSQuestion* dq)
{
for(auto& d : servers) {
if(d.second->isUp() && d.second->qps.check())
if (d.second->isUp() && d.second->qps.checkOnly()) {
return d.second;
}
}
return leastOutstanding(servers, dq);
}
Expand Down
37 changes: 37 additions & 0 deletions pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,43 @@ BOOST_AUTO_TEST_CASE(test_firstAvailable) {
benchPolicy(pol);
}

BOOST_AUTO_TEST_CASE(test_firstAvailableWithOrderAndQPS) {
auto dq = getDQ();
size_t qpsLimit = 10;

ServerPolicy pol{"firstAvailable", firstAvailable, false};
ServerPolicy::NumberedServerVector servers;
servers.push_back({ 1, std::make_shared<DownstreamState>(ComboAddress("192.0.2.1:53")) });
servers.push_back({ 2, std::make_shared<DownstreamState>(ComboAddress("192.0.2.2:53")) });
/* Second server has a higher order, so most queries should be routed to the first (remember that
we need to keep them ordered!).
However the first server has a QPS limit at 10 qps, so any query above that should be routed
to the second server. */
servers.at(0).second->order = 1;
servers.at(1).second->order = 2;
servers.at(0).second->qps = QPSLimiter(qpsLimit, qpsLimit);
/* mark the servers as 'up' */
servers.at(0).second->setUp();
servers.at(1).second->setUp();

/* the first queries under the QPS limit should be
sent to the first server */
for (size_t idx = 0; idx < qpsLimit; idx++) {
auto server = pol.getSelectedBackend(servers, dq);
BOOST_REQUIRE(server != nullptr);
BOOST_CHECK(server == servers.at(0).second);
server->incQueriesCount();
}

/* then to the second server */
for (size_t idx = 0; idx < 100; idx++) {
auto server = pol.getSelectedBackend(servers, dq);
BOOST_REQUIRE(server != nullptr);
BOOST_CHECK(server == servers.at(1).second);
server->incQueriesCount();
}
}

BOOST_AUTO_TEST_CASE(test_roundRobin) {
auto dq = getDQ();

Expand Down
116 changes: 116 additions & 0 deletions regression-tests.dnsdist/test_Routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,122 @@ def testOrder(self):
self.assertEquals(self._responsesCounter['TCP Responder'], 0)
self.assertEquals(self._responsesCounter['TCP Responder 2'], numberOfQueries)

class TestFirstAvailableQPSPacketCacheHits(DNSDistTest):

_verboseMode = True
_testServer2Port = 5351
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(firstAvailable)
s1 = newServer{address="127.0.0.1:%s", order=2}
s1:setUp()
s2 = newServer{address="127.0.0.1:%s", order=1, qps=10}
s2:setUp()
pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
getPool(""):setCache(pc)
"""

@classmethod
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._UDPResponder2.setDaemon(True)
cls._UDPResponder2.start()

cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder.setDaemon(True)
cls._TCPResponder.start()

cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
cls._TCPResponder2.setDaemon(True)
cls._TCPResponder2.start()

def testOrderQPSCacheHits(self):
"""
Routing: firstAvailable policy with QPS limit and packet cache

Send 50 A queries for "order-qps-cache.routing.tests.powerdns.com.",
then 10 A queries for "order-qps-cache-2.routing.tests.powerdns.com." (uncached)
check that dnsdist routes all of the (uncached) queries to the second backend, because it has the lower order value,
and the QPS should only be counted for cache misses.
"""
numberOfQueries = 50
name = 'order-qps-cache.routing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
dns.rdatatype.A,
'192.0.2.1')
response.answer.append(rrset)

# first queries to fill the cache
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
self.assertTrue(receivedQuery)
self.assertTrue(receivedResponse)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)
(receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
self.assertTrue(receivedQuery)
self.assertTrue(receivedResponse)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)

for _ in range(numberOfQueries):
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEquals(receivedResponse, response)

numberOfQueries = 10
name = 'order-qps-cache-2.routing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
dns.rdatatype.A,
'192.0.2.1')
response.answer.append(rrset)

# first queries to fill the cache
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
self.assertTrue(receivedQuery)
self.assertTrue(receivedResponse)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)
(receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
self.assertTrue(receivedQuery)
self.assertTrue(receivedResponse)
receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)

for _ in range(numberOfQueries):
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertEquals(receivedResponse, response)

# 4 queries should made it through, 2 UDP and 2 TCP
for k,v in self._responsesCounter.items():
print(k)
print(v)

if 'UDP Responder' in self._responsesCounter:
self.assertEquals(self._responsesCounter['UDP Responder'], 0)
self.assertEquals(self._responsesCounter['UDP Responder 2'], 2)
if 'TCP Responder' in self._responsesCounter:
self.assertEquals(self._responsesCounter['TCP Responder'], 0)
self.assertEquals(self._responsesCounter['TCP Responder 2'], 2)

class TestRoutingNoServer(DNSDistTest):

_config_template = """
Expand Down