diff --git a/folly/io/async/AsyncSocket.cpp b/folly/io/async/AsyncSocket.cpp index cb22f1f67c1..9f9d1f54ffe 100644 --- a/folly/io/async/AsyncSocket.cpp +++ b/folly/io/async/AsyncSocket.cpp @@ -395,6 +395,7 @@ void AsyncSocket::init() { maxReadsPerEvent_ = 16; connectCallback_ = nullptr; errMessageCallback_ = nullptr; + readAncillaryDataCallback_ = nullptr; readCallback_ = nullptr; writeReqHead_ = nullptr; writeReqTail_ = nullptr; @@ -640,6 +641,7 @@ void AsyncSocket::connect( // yet, so we don't have to register for any events at the moment. VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this; assert(errMessageCallback_ == nullptr); + assert(readAncillaryDataCallback_ == nullptr); assert(readCallback_ == nullptr); assert(writeReqHead_ == nullptr); if (state_ != StateEnum::FAST_OPEN) { @@ -808,6 +810,19 @@ AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const { return errMessageCallback_; } +void AsyncSocket::setReadAncillaryDataCB(ReadAncillaryDataCallback* callback) { + VLOG(6) << "AsyncSocket::setReadAncillaryDataCB() this=" << this + << ", fd=" << fd_ << ", callback=" << callback + << ", state=" << state_; + + readAncillaryDataCallback_ = callback; +} + +AsyncSocket::ReadAncillaryDataCallback* +AsyncSocket::getReadAncillaryDataCallback() const { + return readAncillaryDataCallback_; +} + void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) { sendMsgParamCallback_ = callback; } @@ -1951,7 +1966,40 @@ AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) { return ReadResult(len); } - ssize_t bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT); + ssize_t bytes = 0; + + // No callback to read ancillary data was set + if (readAncillaryDataCallback_ == nullptr) { + bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT); + } else { + struct msghdr msg; + struct iovec iov; + + // Ancillary data buffer and length + msg.msg_control = + readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().data(); + msg.msg_controllen = + readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().size(); + + // Dest address info + msg.msg_name = nullptr; + msg.msg_namelen = 0; + + // Array of data buffers (scatter/gather) + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + // Data buffer pointer and length + iov.iov_base = *buf; + iov.iov_len = *buflen; + + bytes = netops::recvmsg(fd_, &msg, 0); + + if (bytes > 0) { + readAncillaryDataCallback_->ancillaryData(msg); + } + } + if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // No more data to read right now. diff --git a/folly/io/async/AsyncSocket.h b/folly/io/async/AsyncSocket.h index 89343ab003f..a3c6c2997d5 100644 --- a/folly/io/async/AsyncSocket.h +++ b/folly/io/async/AsyncSocket.h @@ -169,6 +169,28 @@ class AsyncSocket : public AsyncTransport { virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0; }; + class ReadAncillaryDataCallback { + public: + virtual ~ReadAncillaryDataCallback() = default; + + /** + * ancillaryData() will be invoked when we read a buffer + * from the socket together with the ancillary data. + * + * @param msgh Reference to msghdr structure describing + * a message read together with the data buffer associated + * with the socket. + */ + virtual void ancillaryData(struct msghdr& msgh) noexcept = 0; + + /** + * getAncillaryDataCtrlBuffer() will be invoked in order to fill the + * ancillary data buffer when it is received. + * getAncillaryDataCtrlBuffer will never return nullptr. + */ + virtual folly::MutableByteRange getAncillaryDataCtrlBuffer() = 0; + }; + class SendMsgParamsCallback { public: virtual ~SendMsgParamsCallback() = default; @@ -499,6 +521,23 @@ class AsyncSocket : public AsyncTransport { */ virtual ErrMessageCallback* getErrMessageCallback() const; + /** + * Set a pointer to ReadAncillaryDataCallback implementation which will + * be invoked with the ancillary data when we read a buffer from the + * associated socket. + * ReadAncillaryDataCallback is implemented only for platforms with + * kernel timestamp support. + * + */ + virtual void setReadAncillaryDataCB(ReadAncillaryDataCallback* callback); + + /** + * Get a pointer to ReadAncillaryDataCallback implementation currently + * registered with this socket. + * + */ + virtual ReadAncillaryDataCallback* getReadAncillaryDataCallback() const; + /** * Set a pointer to SendMsgParamsCallback implementation which * will be used to form ::sendmsg() system call parameters @@ -1364,6 +1403,8 @@ class AsyncSocket : public AsyncTransport { ConnectCallback* connectCallback_; ///< ConnectCallback ErrMessageCallback* errMessageCallback_; ///< TimestampCallback + ReadAncillaryDataCallback* + readAncillaryDataCallback_; ///< AncillaryDataCallback SendMsgParamsCallback* ///< Callback for retrieving sendMsgParamCallback_; ///< ::sendmsg() parameters ReadCallback* readCallback_; ///< ReadCallback diff --git a/folly/io/async/test/AsyncSocketTest2.cpp b/folly/io/async/test/AsyncSocketTest2.cpp index 51e3a38d7e8..a8fddf21a21 100644 --- a/folly/io/async/test/AsyncSocketTest2.cpp +++ b/folly/io/async/test/AsyncSocketTest2.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -3036,6 +3037,9 @@ TEST(AsyncSocketTest, TestEvbDetachThenClose) { /* copied from include/uapi/linux/net_tstamp.h */ /* SO_TIMESTAMPING gets an integer bit field comprised of these values */ enum SOF_TIMESTAMPING { + SOF_TIMESTAMPING_TX_SOFTWARE = (1 << 1), + SOF_TIMESTAMPING_RX_HARDWARE = (1 << 2), + SOF_TIMESTAMPING_RX_SOFTWARE = (1 << 3), SOF_TIMESTAMPING_SOFTWARE = (1 << 4), SOF_TIMESTAMPING_OPT_ID = (1 << 7), SOF_TIMESTAMPING_TX_SCHED = (1 << 8), @@ -4254,3 +4258,105 @@ TEST(AsyncSocketTest, QueueTimeout) { // Since the second message is expired, it should NOT be dequeued EXPECT_EQ(connectionEventCb.getConnectionDequeuedByAcceptCallback(), 1); } + +class TestRXTimestampsCallback + : public folly::AsyncSocket::ReadAncillaryDataCallback { + public: + TestRXTimestampsCallback() {} + void ancillaryData(struct msghdr& msgh) noexcept override { + struct cmsghdr* cmsg; + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != nullptr; + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { + if (cmsg->cmsg_level != SOL_SOCKET || + cmsg->cmsg_type != SO_TIMESTAMPING) { + continue; + } + callCount_++; + timespec* ts = (struct timespec*)CMSG_DATA(cmsg); + actualRxTimestampSec_ = ts[0].tv_sec; + } + } + folly::MutableByteRange getAncillaryDataCtrlBuffer() override { + return folly::MutableByteRange(ancillaryDataCtrlBuffer_); + } + + uint32_t callCount_{0}; + long actualRxTimestampSec_{0}; + + private: + std::array ancillaryDataCtrlBuffer_; +}; + +/** + * Test read ancillary data callback + */ +TEST(AsyncSocketTest, readAncillaryData) { + TestServer server; + + // connect() + EventBase evb; + std::shared_ptr socket = AsyncSocket::newSocket(&evb); + + ConnCallback ccb; + socket->connect(&ccb, server.getAddress(), 1); + LOG(INFO) << "Client socket fd=" << socket->getNetworkSocket(); + + // Enable rx timestamp notifications + ASSERT_NE(socket->getNetworkSocket(), NetworkSocket()); + int flags = SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_RX_SOFTWARE | + SOF_TIMESTAMPING_RX_HARDWARE; + SocketOptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING}; + EXPECT_EQ(tstampingOpt.apply(socket->getNetworkSocket(), flags), 0); + + // Accept the connection. + std::shared_ptr acceptedSocket = server.accept(); + LOG(INFO) << "Server socket fd=" << acceptedSocket->getNetworkSocket(); + + // Wait for connection + evb.loop(); + ASSERT_EQ(ccb.state, STATE_SUCCEEDED); + + TestRXTimestampsCallback rxcb; + + // Set read callback + ReadCallback rcb(100); + socket->setReadCB(&rcb); + + // Get the timestamp when the message was write + struct timespec currentTime; + clock_gettime(CLOCK_REALTIME, ¤tTime); + long writeTimestampSec = currentTime.tv_sec; + + // write bytes from server (acceptedSocket) to client (socket). + std::vector wbuf(128, 'a'); + acceptedSocket->write(wbuf.data(), wbuf.size()); + + // Wait for reading to complete. + evb.loopOnce(); + ASSERT_NE(rcb.buffers.size(), 0); + + // Verify that if the callback is not set, it will not be called + ASSERT_EQ(rxcb.callCount_, 0); + + // Set up rx timestamp callbacks + socket->setReadAncillaryDataCB(&rxcb); + acceptedSocket->write(wbuf.data(), wbuf.size()); + + // Wait for reading to complete. + evb.loopOnce(); + ASSERT_NE(rcb.buffers.size(), 0); + + // Verify that after setting callback, the callback was called + ASSERT_NE(rxcb.callCount_, 0); + // Compare the received timestamp is within an expected range + clock_gettime(CLOCK_REALTIME, ¤tTime); + ASSERT_TRUE(rxcb.actualRxTimestampSec_ <= currentTime.tv_sec); + ASSERT_TRUE(rxcb.actualRxTimestampSec_ >= writeTimestampSec); + + // Close both sockets + acceptedSocket->close(); + socket->close(); + + ASSERT_TRUE(socket->isClosedBySelf()); + ASSERT_FALSE(socket->isClosedByPeer()); +}