Skip to content

Commit

Permalink
Merge pull request #28682 from cyx1231st/wip-seastar-msgr-socket2
Browse files Browse the repository at this point in the history
crimson/net: implement preemptive shutdown/close

Reviewed-by: Kefu Chai <kchai@redhat.com>
  • Loading branch information
tchaikov committed Jul 15, 2019
2 parents a62783a + 0bf1ac6 commit e877af0
Show file tree
Hide file tree
Showing 7 changed files with 500 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/crimson/net/Errors.cc
Expand Up @@ -47,6 +47,8 @@ const std::error_category& net_category()
return "invalid argument";
case error::address_in_use:
return "address in use";
case error::broken_pipe:
return "broken pipe";
default:
return "unknown";
}
Expand All @@ -67,6 +69,8 @@ const std::error_category& net_category()
return std::errc::invalid_argument;
case error::address_in_use:
return std::errc::address_in_use;
case error::broken_pipe:
return std::errc::broken_pipe;
default:
return std::error_condition(ev, *this);
}
Expand All @@ -92,6 +96,9 @@ const std::error_category& net_category()
case error::address_in_use:
return cond == std::errc::address_in_use
|| cond == std::error_condition(EADDRINUSE, std::system_category());
case error::broken_pipe:
return cond == std::errc::broken_pipe
|| cond == std::error_condition(EPIPE, std::system_category());
default:
return false;
}
Expand All @@ -117,6 +124,9 @@ const std::error_category& net_category()
case error::address_in_use:
return code == std::errc::address_in_use
|| code == std::error_code(EADDRINUSE, std::system_category());
case error::broken_pipe:
return code == std::errc::broken_pipe
|| code == std::error_code(EPIPE, std::system_category());
default:
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/crimson/net/Errors.h
Expand Up @@ -31,6 +31,7 @@ enum class error {
corrupted_message,
invalid_argument,
address_in_use,
broken_pipe,
};

/// net error category
Expand Down
8 changes: 4 additions & 4 deletions src/crimson/net/Protocol.cc
Expand Up @@ -55,10 +55,10 @@ seastar::future<> Protocol::close()
assert(!close_ready.valid());

if (socket) {
close_ready = socket->close()
.then([this] {
return pending_dispatch.close();
}).finally(std::move(cleanup));
socket->shutdown();
close_ready = pending_dispatch.close().finally([this] {
return socket->close();
}).finally(std::move(cleanup));
} else {
close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
Expand Down
39 changes: 39 additions & 0 deletions src/crimson/net/Socket.cc
Expand Up @@ -3,12 +3,17 @@

#include "Socket.h"

#include "crimson/common/log.h"
#include "Errors.h"

namespace ceph::net {

namespace {

seastar::logger& logger() {
return ceph::get_logger(ceph_subsys_ms);
}

// an input_stream consumer that reads buffer segments into a bufferlist up to
// the given number of remaining bytes
struct bufferlist_consumer {
Expand Down Expand Up @@ -81,4 +86,38 @@ Socket::read_exactly(size_t bytes) {
});
}

void Socket::shutdown() {
#ifndef NDEBUG
ceph_assert(!down);
down = true;
#endif
socket.shutdown_input();
socket.shutdown_output();
}

static inline seastar::future<> close_and_handle_errors(auto& out) {
return out.close().handle_exception_type([] (const std::system_error& e) {
if (e.code() != error::broken_pipe &&
e.code() != error::connection_reset) {
logger().error("Socket::close(): unexpected error {}", e);
ceph_abort();
}
// can happen when out is already shutdown, ignore
});
}

seastar::future<> Socket::close() {
#ifndef NDEBUG
ceph_assert(!closed);
closed = true;
#endif
return seastar::when_all_succeed(
in.close(),
close_and_handle_errors(out)
).handle_exception([] (auto eptr) {
logger().error("Socket::close(): unexpected exception {}", eptr);
ceph_abort();
});
}

} // namespace ceph::net
29 changes: 24 additions & 5 deletions src/crimson/net/Socket.h
Expand Up @@ -22,6 +22,11 @@ class Socket
seastar::input_stream<char> in;
seastar::output_stream<char> out;

#ifndef NDEBUG
bool down = false;
bool closed = false;
#endif

/// buffer state for read()
struct {
bufferlist buffer;
Expand All @@ -39,6 +44,12 @@ class Socket
// performance. see seastar::net::connected_socket::output()
out(socket.output(65536)) {}

~Socket() {
#ifndef NDEBUG
assert(closed);
#endif
}

Socket(Socket&& o) = delete;

static seastar::future<SocketFRef>
Expand Down Expand Up @@ -79,12 +90,20 @@ class Socket
return out.write(std::move(buf)).then([this] { return out.flush(); });
}

// preemptively disable further reads or writes, can only be shutdown once.
void shutdown();

/// Socket can only be closed once.
seastar::future<> close() {
return seastar::smp::submit_to(sid, [this] {
return seastar::when_all(
in.close(), out.close()).discard_result();
});
seastar::future<> close();

// shutdown input_stream only, for tests
void force_shutdown_in() {
socket.shutdown_input();
}

// shutdown output_stream only, for tests
void force_shutdown_out() {
socket.shutdown_output();
}
};

Expand Down
143 changes: 143 additions & 0 deletions src/test/crimson/test_messenger.cc
Expand Up @@ -434,6 +434,145 @@ static seastar::future<> test_concurrent_dispatch(bool v2)
});
}

seastar::future<> test_preemptive_shutdown(bool v2) {
struct test_state {
class Server final
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Server> {
ceph::net::Messenger *msgr = nullptr;
ceph::auth::DummyAuthClientServer dummy_auth;

seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
return c->send(MessageRef{new MPing, false});
}

public:
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce,
const entity_addr_t& addr) {
return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
).then([this, addr](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& server) {
server.msgr = messenger->get_local_shard();
server.msgr->set_default_policy(ceph::net::SocketPolicy::stateless_server(0));
server.msgr->set_auth_client(&server.dummy_auth);
server.msgr->set_auth_server(&server.dummy_auth);
}).then([messenger, addr] {
return messenger->bind(entity_addrvec_t{addr});
}).then([this, messenger] {
return messenger->start(this);
});
});
}
entity_addr_t get_addr() const {
return msgr->get_myaddr();
}
seastar::future<> shutdown() {
return msgr->shutdown();
}
Dispatcher* get_local_shard() override {
return &(container().local());
}
seastar::future<> stop() {
return seastar::now();
}
};

class Client final
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Client> {
ceph::net::Messenger *msgr = nullptr;
ceph::auth::DummyAuthClientServer dummy_auth;

bool stop_send = false;
seastar::promise<> stopped_send_promise;

seastar::future<> ms_dispatch(ceph::net::Connection* c,
MessageRef m) override {
return seastar::now();
}

public:
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce) {
return ceph::net::Messenger::create(name, lname, nonce, seastar::engine().cpu_id()
).then([this](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& client) {
client.msgr = messenger->get_local_shard();
client.msgr->set_default_policy(ceph::net::SocketPolicy::lossy_client(0));
client.msgr->set_auth_client(&client.dummy_auth);
client.msgr->set_auth_server(&client.dummy_auth);
}).then([this, messenger] {
return messenger->start(this);
});
});
}
seastar::future<> send_pings(const entity_addr_t& addr) {
return msgr->connect(addr, entity_name_t::TYPE_OSD
).then([this](ceph::net::ConnectionXRef conn) {
seastar::do_until(
[this] { return stop_send; },
[this, conn = &**conn] {
return conn->send(MessageRef{new MPing, false}).then([] {
return seastar::sleep(0ms);
});
}
).then_wrapped([this, conn] (auto fut) {
fut.forward_to(std::move(stopped_send_promise));
});
});
}
seastar::future<> shutdown() {
return msgr->shutdown().then([this] {
stop_send = true;
return stopped_send_promise.get_future();
});
}
Dispatcher* get_local_shard() override {
return &(container().local());
}
seastar::future<> stop() {
return seastar::now();
}
};
};

logger().info("test_preemptive_shutdown(v2={}):", v2);
return seastar::when_all_succeed(
ceph::net::create_sharded<test_state::Server>(),
ceph::net::create_sharded<test_state::Client>()
).then([v2](test_state::Server *server,
test_state::Client *client) {
entity_addr_t addr;
addr.parse("127.0.0.1:9010", nullptr);
if (v2) {
addr.set_type(entity_addr_t::TYPE_MSGR2);
} else {
addr.set_type(entity_addr_t::TYPE_LEGACY);
}
addr.set_family(AF_INET);
return seastar::when_all_succeed(
server->init(entity_name_t::OSD(6), "server4", 7, addr),
client->init(entity_name_t::OSD(7), "client4", 8)
).then([server, client] {
return client->send_pings(server->get_addr());
}).then([] {
return seastar::sleep(100ms);
}).then([client] {
logger().info("client shutdown...");
return client->shutdown();
}).finally([server] {
logger().info("server shutdown...");
return server->shutdown();
}).finally([] {
logger().info("test_preemptive_shutdown() done!\n");
});
});
}

}

int main(int argc, char** argv)
Expand All @@ -458,6 +597,10 @@ int main(int argc, char** argv)
return test_concurrent_dispatch(false);
}).then([] {
return test_concurrent_dispatch(true);
}).then([] {
return test_preemptive_shutdown(false);
}).then([] {
return test_preemptive_shutdown(true);
}).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {
Expand Down

0 comments on commit e877af0

Please sign in to comment.