Skip to content

Commit

Permalink
CXXCBC-64: allow to bind query request to selected node (#115)
Browse files Browse the repository at this point in the history
  • Loading branch information
avsej committed Jan 21, 2022
1 parent b5829df commit ed0c886
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 32 deletions.
1 change: 1 addition & 0 deletions couchbase/io/http_session.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ class http_session : public std::enable_shared_from_this<http_session>
handler({}, std::move(self->parser_.response));
}
self->parser_.reset();
self->reading_ = false;
return;
}
self->reading_ = false;
Expand Down
112 changes: 80 additions & 32 deletions couchbase/io/http_session_manager.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <couchbase/io/http_command.hxx>
#include <couchbase/io/http_context.hxx>
#include <couchbase/io/http_session.hxx>
#include <couchbase/io/http_traits.hxx>
#include <couchbase/metrics/meter.hxx>
#include <couchbase/operations/http_noop.hxx>
#include <couchbase/service_type.hxx>
Expand Down Expand Up @@ -148,46 +149,42 @@ class http_session_manager : public std::enable_shared_from_this<http_session_ma
}
}

std::shared_ptr<http_session> check_out(service_type type, const couchbase::cluster_credentials& credentials)
std::pair<std::error_code, std::shared_ptr<http_session>> check_out(service_type type,
const couchbase::cluster_credentials& credentials,
const std::string& preferred_node)
{
std::scoped_lock lock(sessions_mutex_);
idle_sessions_[type].remove_if([](const auto& s) { return !s; });
busy_sessions_[type].remove_if([](const auto& s) { return !s; });
if (idle_sessions_[type].empty()) {
auto [hostname, port] = next_node(type);
auto [hostname, port] = preferred_node.empty() ? next_node(type) : lookup_node(type, preferred_node);
if (port == 0) {
return nullptr;
return { error::common_errc::service_not_available, nullptr };
}
config_.nodes.size();
std::shared_ptr<http_session> session;
if (options_.enable_tls) {
session = std::make_shared<http_session>(type,
client_id_,
ctx_,
tls_,
credentials,
hostname,
std::to_string(port),
http_context{ config_, options_, query_cache_ });
auto session = bootstrap_session(type, credentials, hostname, port);
busy_sessions_[type].push_back(session);
return { {}, session };
}
std::shared_ptr<http_session> session{};
if (preferred_node.empty()) {
session = idle_sessions_[type].front();
idle_sessions_[type].pop_front();
session->reset_idle();
} else {
auto ptr = std::find_if(idle_sessions_[type].begin(), idle_sessions_[type].end(), [preferred_node](const auto& s) {
return s->remote_address() == preferred_node;
});
if (ptr != idle_sessions_[type].end()) {
session = *ptr;
idle_sessions_[type].erase(ptr);
session->reset_idle();
} else {
session = std::make_shared<http_session>(
type, client_id_, ctx_, credentials, hostname, std::to_string(port), http_context{ config_, options_, query_cache_ });
auto [hostname, port] = split_host_port(preferred_node);
session = bootstrap_session(type, credentials, hostname, port);
}
session->start();

session->on_stop([type, id = session->id(), self = this->shared_from_this()]() {
std::scoped_lock inner_lock(self->sessions_mutex_);
self->busy_sessions_[type].remove_if([&id](const auto& s) { return !s || s->id() == id; });
self->idle_sessions_[type].remove_if([&id](const auto& s) { return !s || s->id() == id; });
});
busy_sessions_[type].push_back(session);
return session;
}
auto session = idle_sessions_[type].front();
idle_sessions_[type].pop_front();
session->reset_idle();
busy_sessions_[type].push_back(session);
return session;
return { {}, session };
}

void check_in(service_type type, std::shared_ptr<http_session> session)
Expand Down Expand Up @@ -220,10 +217,16 @@ class http_session_manager : public std::enable_shared_from_this<http_session_ma
template<typename Request, typename Handler>
void execute(Request request, Handler&& handler, const couchbase::cluster_credentials& credentials)
{
auto session = check_out(request.type, credentials);
if (!session) {
std::string preferred_node;
if constexpr (http_traits::supports_sticky_node_v<Request>) {
if (request.send_to_node) {
preferred_node = *request.send_to_node;
}
}
auto [error, session] = check_out(request.type, credentials, preferred_node);
if (error) {
typename Request::error_context_type ctx{};
ctx.ec = error::common_errc::service_not_available;
ctx.ec = error;
using response_type = typename Request::encoded_response_type;
return handler(request.make_response(std::move(ctx), response_type{}));
}
Expand Down Expand Up @@ -251,6 +254,29 @@ class http_session_manager : public std::enable_shared_from_this<http_session_ma
}

private:
std::shared_ptr<http_session> bootstrap_session(service_type type,
const couchbase::cluster_credentials& credentials,
const std::string& hostname,
std::uint16_t port)
{
std::shared_ptr<http_session> session;
if (options_.enable_tls) {
session = std::make_shared<http_session>(
type, client_id_, ctx_, tls_, credentials, hostname, std::to_string(port), http_context{ config_, options_, query_cache_ });
} else {
session = std::make_shared<http_session>(
type, client_id_, ctx_, credentials, hostname, std::to_string(port), http_context{ config_, options_, query_cache_ });
}
session->start();

session->on_stop([type, id = session->id(), self = this->shared_from_this()]() {
std::scoped_lock inner_lock(self->sessions_mutex_);
self->busy_sessions_[type].remove_if([&id](const auto& s) { return !s || s->id() == id; });
self->idle_sessions_[type].remove_if([&id](const auto& s) { return !s || s->id() == id; });
});
return session;
}

std::pair<std::string, std::uint16_t> next_node(service_type type)
{
auto candidates = config_.nodes.size();
Expand All @@ -266,6 +292,28 @@ class http_session_manager : public std::enable_shared_from_this<http_session_ma
return { "", 0 };
}

std::pair<std::string, std::uint16_t> split_host_port(const std::string& address)
{
auto last_colon = address.find_last_of(':');
if (last_colon == std::string::npos || address.size() - 1 == last_colon) {
return { "", 0 };
}
auto hostname = address.substr(0, last_colon);
auto port = gsl::narrow_cast<uint16_t>(std::stoul(address.substr(last_colon + 1)));
return { hostname, port };
}

std::pair<std::string, std::uint16_t> lookup_node(service_type type, const std::string& preferred_node)
{
auto [hostname, port] = split_host_port(preferred_node);
if (std::none_of(config_.nodes.begin(), config_.nodes.end(), [this, type, &h = hostname, &p = port](const auto& node) {
return node.hostname == h && node.port_or(options_.network, type, options_.enable_tls, 0) == p;
})) {
return { "", 0 };
}
return { hostname, port };
}

std::string client_id_;
asio::io_context& ctx_;
asio::ssl::context& tls_;
Expand Down
30 changes: 30 additions & 0 deletions couchbase/io/http_traits.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2020-2021 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <type_traits>

namespace couchbase::io::http_traits
{
template<typename T>
struct supports_sticky_node : public std::false_type {
};

template<typename T>
inline constexpr bool supports_sticky_node_v = supports_sticky_node<T>::value;
} // namespace couchbase::io::http_traits
1 change: 1 addition & 0 deletions couchbase/operations/document_query.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ query_request::make_response(error_context::query&& ctx, const encoded_response_
query_response response{ std::move(ctx) };
response.ctx.statement = statement;
response.ctx.parameters = body_str;
response.served_by_node = response.ctx.last_dispatched_to.value_or("");
if (!response.ctx.ec) {
try {
response.payload = utils::json::parse(encoded.body.data()).as<query_response_payload>();
Expand Down
10 changes: 10 additions & 0 deletions couchbase/operations/document_query.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <couchbase/error_context/query.hxx>
#include <couchbase/io/http_context.hxx>
#include <couchbase/io/http_message.hxx>
#include <couchbase/io/http_traits.hxx>
#include <couchbase/json_string.hxx>
#include <couchbase/platform/uuid.h>
#include <couchbase/protocol/mutation_token.hxx>
Expand Down Expand Up @@ -66,6 +67,7 @@ namespace couchbase::operations
struct query_response {
error_context::query ctx;
query_response_payload payload{};
std::string served_by_node{};
};

struct query_request {
Expand Down Expand Up @@ -110,6 +112,7 @@ struct query_request {
std::vector<couchbase::json_string> positional_parameters{};
std::map<std::string, couchbase::json_string> named_parameters{};
std::optional<std::function<utils::json::stream_control(std::string)>> row_callback{};
std::optional<std::string> send_to_node{};

[[nodiscard]] std::error_code encode_to(encoded_request_type& encoded, http_context& context);

Expand All @@ -121,3 +124,10 @@ struct query_request {
};

} // namespace couchbase::operations

namespace couchbase::io::http_traits
{
template<>
struct supports_sticky_node<couchbase::operations::query_request> : public std::true_type {
};
} // namespace couchbase::io::http_traits
67 changes: 67 additions & 0 deletions test/test_integration_query.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,70 @@ TEST_CASE("integration: streaming analytics results", "[integration]")
REQUIRE(rows[2] == R"({ "data": { "tech": "Couchbase" } })");
}
}

TEST_CASE("integration: sticking query to the service node", "[integration]")
{
test::utils::integration_test_guard integration;

if (!integration.cluster_version().supports_gcccp()) {
test::utils::open_bucket(integration.cluster, integration.ctx.bucket);
}

std::string node_to_stick_queries;
{
couchbase::operations::query_request req{ R"(SELECT 42 AS answer)" };
auto resp = test::utils::execute(integration.cluster, req);
REQUIRE_FALSE(resp.ctx.ec);
REQUIRE(resp.payload.rows.size() == 1);
REQUIRE(resp.payload.rows[0] == R"({"answer":42})");
REQUIRE_FALSE(resp.served_by_node.empty());
node_to_stick_queries = resp.served_by_node;
}

if (integration.number_of_query_nodes() > 1) {
std::vector<std::string> used_nodes{};
std::mutex used_nodes_mutex{};

std::vector<std::thread> threads;
threads.reserve(10);
for (int i = 0; i < 10; ++i) {
threads.emplace_back([i, &cluster = integration.cluster, node_to_stick_queries, &used_nodes, &used_nodes_mutex]() {
couchbase::operations::query_request req{ fmt::format(R"(SELECT {} AS answer)", i) };
auto resp = test::utils::execute(cluster, req);
if (resp.ctx.ec || resp.served_by_node.empty() || resp.payload.rows.size() != 1 ||
resp.payload.rows[0] != fmt::format(R"({{"answer":{}}})", i)) {
return;
}
std::scoped_lock lock(used_nodes_mutex);
used_nodes.push_back(resp.served_by_node);
});
}
for (auto& thread : threads) {
thread.join();
}
REQUIRE(used_nodes.size() == 10);
REQUIRE(std::set(used_nodes.begin(), used_nodes.end()).size() > 1);

threads.clear();
used_nodes.clear();

for (int i = 0; i < 10; ++i) {
threads.emplace_back([i, &cluster = integration.cluster, node_to_stick_queries, &used_nodes, &used_nodes_mutex]() {
couchbase::operations::query_request req{ fmt::format(R"(SELECT {} AS answer)", i) };
req.send_to_node = node_to_stick_queries;
auto resp = test::utils::execute(cluster, req);
if (resp.ctx.ec || resp.served_by_node.empty() || resp.payload.rows.size() != 1 ||
resp.payload.rows[0] != fmt::format(R"({{"answer":{}}})", i)) {
return;
}
std::scoped_lock lock(used_nodes_mutex);
used_nodes.push_back(resp.served_by_node);
});
}
for (auto& thread : threads) {
thread.join();
}
REQUIRE(used_nodes.size() == 10);
REQUIRE(std::set(used_nodes.begin(), used_nodes.end()).size() == 1);
}
}
8 changes: 8 additions & 0 deletions test/utils/integration_test_guard.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ class integration_test_guard
return has_service(couchbase::service_type::analytics);
}

auto number_of_query_nodes()
{
const auto& ci = load_cluster_info();
return std::count_if(ci.nodes.begin(), ci.nodes.end(), [](const auto& node) {
return std::find(node.services.begin(), node.services.end(), "n1ql") != node.services.end();
});
}

server_version cluster_version();

std::thread io_thread{};
Expand Down

0 comments on commit ed0c886

Please sign in to comment.