diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 678516f9f9638..d237cddbac694 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -512,7 +512,10 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { m->get_payload(), m->get_middle(), m->get_data()); - connection->outgoing_bl.append(message.get_buffer(session_stream_handlers)); + if (!append_frame(message)) { + m->put(); + return -EILSEQ; + } ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; @@ -542,15 +545,17 @@ ssize_t ProtocolV2::write_message(Message *m, bool more) { return rc; } -void ProtocolV2::append_keepalive() { - ldout(cct, 10) << __func__ << dendl; - auto keepalive_frame = KeepAliveFrame::Encode(); - connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers)); -} - -void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { - auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp); - connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers)); +template +bool ProtocolV2::append_frame(F& frame) { + ceph::bufferlist bl; + try { + bl = frame.get_buffer(session_stream_handlers); + } catch (ceph::crypto::onwire::TxHandlerError &e) { + ldout(cct, 1) << __func__ << " " << e.what() << dendl; + return false; + } + connection->outgoing_bl.append(bl); + return true; } void ProtocolV2::handle_message_ack(uint64_t seq) { @@ -586,7 +591,15 @@ void ProtocolV2::write_event() { connection->write_lock.lock(); if (can_write) { if (keepalive) { - append_keepalive(); + ldout(cct, 10) << __func__ << " appending keepalive" << dendl; + auto keepalive_frame = KeepAliveFrame::Encode(); + if (!append_frame(keepalive_frame)) { + connection->write_lock.unlock(); + connection->lock.lock(); + fault(); + connection->lock.unlock(); + return; + } keepalive = false; } @@ -630,13 +643,16 @@ void ProtocolV2::write_event() { if (left) { ceph_le64 s; s = in_seq; - auto ack = AckFrame::Encode(in_seq); - connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers)); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; - ack_left -= left; - left = ack_left; - r = connection->_try_send(left); + auto ack_frame = AckFrame::Encode(in_seq); + if (append_frame(ack_frame)) { + ack_left -= left; + left = ack_left; + r = connection->_try_send(left); + } else { + r = -EILSEQ; + } } else if (is_queued()) { r = connection->_try_send(); } @@ -736,7 +752,13 @@ template CtPtr ProtocolV2::write(const std::string &desc, CONTINUATION_TYPE &next, F &frame) { - ceph::bufferlist bl = frame.get_buffer(session_stream_handlers); + ceph::bufferlist bl; + try { + bl = frame.get_buffer(session_stream_handlers); + } catch (ceph::crypto::onwire::TxHandlerError &e) { + ldout(cct, 1) << __func__ << " " << e.what() << dendl; + return _fault(); + } return write(desc, next, bl); } @@ -1631,7 +1653,11 @@ CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload) ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; connection->write_lock.lock(); - append_keepalive_ack(keepalive_frame.timestamp()); + auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp()); + if (!append_frame(keepalive_ack_frame)) { + connection->write_lock.unlock(); + return _fault(); + } connection->write_lock.unlock(); ldout(cct, 20) << __func__ << " got KEEPALIVE2 " diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 69bff1b90dd49..61ec4968725bb 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -129,6 +129,9 @@ class ProtocolV2 : public Protocol { CONTINUATION_TYPE &next, bufferlist &buffer); + template + bool append_frame(F& frame); + void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); void reset_recv_state(); @@ -139,8 +142,6 @@ class ProtocolV2 : public Protocol { void prepare_send_message(uint64_t features, Message *m); out_queue_entry_t _get_next_outgoing(); ssize_t write_message(Message *m, bool more); - void append_keepalive(); - void append_keepalive_ack(utime_t ×tamp); void handle_message_ack(uint64_t seq); CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner); diff --git a/src/msg/async/crypto_onwire.cc b/src/msg/async/crypto_onwire.cc index acf3f66689e14..07e7fe6553c2b 100644 --- a/src/msg/async/crypto_onwire.cc +++ b/src/msg/async/crypto_onwire.cc @@ -22,6 +22,10 @@ static constexpr const std::size_t AESGCM_BLOCK_LEN{16}; struct nonce_t { std::uint32_t random_seq; std::uint64_t random_rest; + + bool operator==(const nonce_t& rhs) const { + return !memcmp(this, &rhs, sizeof(*this)); + } } __attribute__((packed)); static_assert(sizeof(nonce_t) == AESGCM_IV_LEN); @@ -35,7 +39,8 @@ class AES128GCM_OnWireTxHandler : public ceph::crypto::onwire::TxHandler { CephContext* const cct; std::unique_ptr ectx; ceph::bufferlist buffer; - nonce_t nonce; + nonce_t nonce, initial_nonce; + bool used_initial_nonce; static_assert(sizeof(nonce) == AESGCM_IV_LEN); public: @@ -44,7 +49,7 @@ class AES128GCM_OnWireTxHandler : public ceph::crypto::onwire::TxHandler { const nonce_t& nonce) : cct(cct), ectx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free), - nonce(nonce) { + nonce(nonce), initial_nonce(nonce), used_initial_nonce(false) { ceph_assert_always(ectx); ceph_assert_always(key.size() * CHAR_BIT == 128); @@ -61,6 +66,7 @@ class AES128GCM_OnWireTxHandler : public ceph::crypto::onwire::TxHandler { ~AES128GCM_OnWireTxHandler() override { ::ceph::crypto::zeroize_for_security(&nonce, sizeof(nonce)); + ::ceph::crypto::zeroize_for_security(&initial_nonce, sizeof(initial_nonce)); } std::uint32_t calculate_segment_size(std::uint32_t size) override @@ -78,6 +84,13 @@ class AES128GCM_OnWireTxHandler : public ceph::crypto::onwire::TxHandler { void AES128GCM_OnWireTxHandler::reset_tx_handler( std::initializer_list update_size_sequence) { + if (nonce == initial_nonce) { + if (used_initial_nonce) { + throw ceph::crypto::onwire::TxHandlerError("out of nonces"); + } + used_initial_nonce = true; + } + if(1 != EVP_EncryptInit_ex(ectx.get(), nullptr, nullptr, nullptr, reinterpret_cast(&nonce))) { throw std::runtime_error("EVP_EncryptInit_ex failed"); diff --git a/src/msg/async/crypto_onwire.h b/src/msg/async/crypto_onwire.h index bd682e8c71cde..0c544f205aca4 100644 --- a/src/msg/async/crypto_onwire.h +++ b/src/msg/async/crypto_onwire.h @@ -45,6 +45,11 @@ struct MsgAuthError : public std::runtime_error { } }; +struct TxHandlerError : public std::runtime_error { + TxHandlerError(const char* what) + : std::runtime_error(std::string("tx handler error: ") + what) {} +}; + struct TxHandler { virtual ~TxHandler() = default;