Skip to content

Commit

Permalink
AsyncSocket facility to read ancillary data
Browse files Browse the repository at this point in the history
Summary: Defined new callback - ReadAncillaryDataCallback, which enables getting ancillary data when receive a message using recvmsg API.

Reviewed By: cgrushko

Differential Revision: D24246707

fbshipit-source-id: 3363b5fafe8d370cf5560afe00476fc8ea723e7a
  • Loading branch information
Efrat Lewis authored and facebook-github-bot committed Dec 27, 2020
1 parent a200097 commit 7a2b83b
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 1 deletion.
50 changes: 49 additions & 1 deletion folly/io/async/AsyncSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ void AsyncSocket::init() {
maxReadsPerEvent_ = 16;
connectCallback_ = nullptr;
errMessageCallback_ = nullptr;
readAncillaryDataCallback_ = nullptr;
readCallback_ = nullptr;
writeReqHead_ = nullptr;
writeReqTail_ = nullptr;
Expand Down Expand Up @@ -640,6 +641,7 @@ void AsyncSocket::connect(
// yet, so we don't have to register for any events at the moment.
VLOG(8) << "AsyncSocket::connect succeeded immediately; this=" << this;
assert(errMessageCallback_ == nullptr);
assert(readAncillaryDataCallback_ == nullptr);
assert(readCallback_ == nullptr);
assert(writeReqHead_ == nullptr);
if (state_ != StateEnum::FAST_OPEN) {
Expand Down Expand Up @@ -808,6 +810,19 @@ AsyncSocket::ErrMessageCallback* AsyncSocket::getErrMessageCallback() const {
return errMessageCallback_;
}

void AsyncSocket::setReadAncillaryDataCB(ReadAncillaryDataCallback* callback) {
VLOG(6) << "AsyncSocket::setReadAncillaryDataCB() this=" << this
<< ", fd=" << fd_ << ", callback=" << callback
<< ", state=" << state_;

readAncillaryDataCallback_ = callback;
}

AsyncSocket::ReadAncillaryDataCallback*
AsyncSocket::getReadAncillaryDataCallback() const {
return readAncillaryDataCallback_;
}

void AsyncSocket::setSendMsgParamCB(SendMsgParamsCallback* callback) {
sendMsgParamCallback_ = callback;
}
Expand Down Expand Up @@ -1951,7 +1966,40 @@ AsyncSocket::performRead(void** buf, size_t* buflen, size_t* /* offset */) {
return ReadResult(len);
}

ssize_t bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT);
ssize_t bytes = 0;

// No callback to read ancillary data was set
if (readAncillaryDataCallback_ == nullptr) {
bytes = netops::recv(fd_, *buf, *buflen, MSG_DONTWAIT);
} else {
struct msghdr msg;
struct iovec iov;

// Ancillary data buffer and length
msg.msg_control =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().data();
msg.msg_controllen =
readAncillaryDataCallback_->getAncillaryDataCtrlBuffer().size();

// Dest address info
msg.msg_name = nullptr;
msg.msg_namelen = 0;

// Array of data buffers (scatter/gather)
msg.msg_iov = &iov;
msg.msg_iovlen = 1;

// Data buffer pointer and length
iov.iov_base = *buf;
iov.iov_len = *buflen;

bytes = netops::recvmsg(fd_, &msg, 0);

if (bytes > 0) {
readAncillaryDataCallback_->ancillaryData(msg);
}
}

if (bytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more data to read right now.
Expand Down
41 changes: 41 additions & 0 deletions folly/io/async/AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,28 @@ class AsyncSocket : public AsyncTransport {
virtual void errMessageError(const AsyncSocketException& ex) noexcept = 0;
};

class ReadAncillaryDataCallback {
public:
virtual ~ReadAncillaryDataCallback() = default;

/**
* ancillaryData() will be invoked when we read a buffer
* from the socket together with the ancillary data.
*
* @param msgh Reference to msghdr structure describing
* a message read together with the data buffer associated
* with the socket.
*/
virtual void ancillaryData(struct msghdr& msgh) noexcept = 0;

/**
* getAncillaryDataCtrlBuffer() will be invoked in order to fill the
* ancillary data buffer when it is received.
* getAncillaryDataCtrlBuffer will never return nullptr.
*/
virtual folly::MutableByteRange getAncillaryDataCtrlBuffer() = 0;
};

class SendMsgParamsCallback {
public:
virtual ~SendMsgParamsCallback() = default;
Expand Down Expand Up @@ -499,6 +521,23 @@ class AsyncSocket : public AsyncTransport {
*/
virtual ErrMessageCallback* getErrMessageCallback() const;

/**
* Set a pointer to ReadAncillaryDataCallback implementation which will
* be invoked with the ancillary data when we read a buffer from the
* associated socket.
* ReadAncillaryDataCallback is implemented only for platforms with
* kernel timestamp support.
*
*/
virtual void setReadAncillaryDataCB(ReadAncillaryDataCallback* callback);

/**
* Get a pointer to ReadAncillaryDataCallback implementation currently
* registered with this socket.
*
*/
virtual ReadAncillaryDataCallback* getReadAncillaryDataCallback() const;

/**
* Set a pointer to SendMsgParamsCallback implementation which
* will be used to form ::sendmsg() system call parameters
Expand Down Expand Up @@ -1364,6 +1403,8 @@ class AsyncSocket : public AsyncTransport {

ConnectCallback* connectCallback_; ///< ConnectCallback
ErrMessageCallback* errMessageCallback_; ///< TimestampCallback
ReadAncillaryDataCallback*
readAncillaryDataCallback_; ///< AncillaryDataCallback
SendMsgParamsCallback* ///< Callback for retrieving
sendMsgParamCallback_; ///< ::sendmsg() parameters
ReadCallback* readCallback_; ///< ReadCallback
Expand Down
106 changes: 106 additions & 0 deletions folly/io/async/test/AsyncSocketTest2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <fcntl.h>
#include <sys/types.h>

#include <time.h>
#include <iostream>
#include <memory>
#include <thread>
Expand Down Expand Up @@ -3036,6 +3037,9 @@ TEST(AsyncSocketTest, TestEvbDetachThenClose) {
/* copied from include/uapi/linux/net_tstamp.h */
/* SO_TIMESTAMPING gets an integer bit field comprised of these values */
enum SOF_TIMESTAMPING {
SOF_TIMESTAMPING_TX_SOFTWARE = (1 << 1),
SOF_TIMESTAMPING_RX_HARDWARE = (1 << 2),
SOF_TIMESTAMPING_RX_SOFTWARE = (1 << 3),
SOF_TIMESTAMPING_SOFTWARE = (1 << 4),
SOF_TIMESTAMPING_OPT_ID = (1 << 7),
SOF_TIMESTAMPING_TX_SCHED = (1 << 8),
Expand Down Expand Up @@ -4254,3 +4258,105 @@ TEST(AsyncSocketTest, QueueTimeout) {
// Since the second message is expired, it should NOT be dequeued
EXPECT_EQ(connectionEventCb.getConnectionDequeuedByAcceptCallback(), 1);
}

class TestRXTimestampsCallback
: public folly::AsyncSocket::ReadAncillaryDataCallback {
public:
TestRXTimestampsCallback() {}
void ancillaryData(struct msghdr& msgh) noexcept override {
struct cmsghdr* cmsg;
for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != nullptr;
cmsg = CMSG_NXTHDR(&msgh, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET ||
cmsg->cmsg_type != SO_TIMESTAMPING) {
continue;
}
callCount_++;
timespec* ts = (struct timespec*)CMSG_DATA(cmsg);
actualRxTimestampSec_ = ts[0].tv_sec;
}
}
folly::MutableByteRange getAncillaryDataCtrlBuffer() override {
return folly::MutableByteRange(ancillaryDataCtrlBuffer_);
}

uint32_t callCount_{0};
long actualRxTimestampSec_{0};

private:
std::array<uint8_t, 1024> ancillaryDataCtrlBuffer_;
};

/**
* Test read ancillary data callback
*/
TEST(AsyncSocketTest, readAncillaryData) {
TestServer server;

// connect()
EventBase evb;
std::shared_ptr<AsyncSocket> socket = AsyncSocket::newSocket(&evb);

ConnCallback ccb;
socket->connect(&ccb, server.getAddress(), 1);
LOG(INFO) << "Client socket fd=" << socket->getNetworkSocket();

// Enable rx timestamp notifications
ASSERT_NE(socket->getNetworkSocket(), NetworkSocket());
int flags = SOF_TIMESTAMPING_SOFTWARE | SOF_TIMESTAMPING_RX_SOFTWARE |
SOF_TIMESTAMPING_RX_HARDWARE;
SocketOptionKey tstampingOpt = {SOL_SOCKET, SO_TIMESTAMPING};
EXPECT_EQ(tstampingOpt.apply(socket->getNetworkSocket(), flags), 0);

// Accept the connection.
std::shared_ptr<BlockingSocket> acceptedSocket = server.accept();
LOG(INFO) << "Server socket fd=" << acceptedSocket->getNetworkSocket();

// Wait for connection
evb.loop();
ASSERT_EQ(ccb.state, STATE_SUCCEEDED);

TestRXTimestampsCallback rxcb;

// Set read callback
ReadCallback rcb(100);
socket->setReadCB(&rcb);

// Get the timestamp when the message was write
struct timespec currentTime;
clock_gettime(CLOCK_REALTIME, &currentTime);
long writeTimestampSec = currentTime.tv_sec;

// write bytes from server (acceptedSocket) to client (socket).
std::vector<uint8_t> wbuf(128, 'a');
acceptedSocket->write(wbuf.data(), wbuf.size());

// Wait for reading to complete.
evb.loopOnce();
ASSERT_NE(rcb.buffers.size(), 0);

// Verify that if the callback is not set, it will not be called
ASSERT_EQ(rxcb.callCount_, 0);

// Set up rx timestamp callbacks
socket->setReadAncillaryDataCB(&rxcb);
acceptedSocket->write(wbuf.data(), wbuf.size());

// Wait for reading to complete.
evb.loopOnce();
ASSERT_NE(rcb.buffers.size(), 0);

// Verify that after setting callback, the callback was called
ASSERT_NE(rxcb.callCount_, 0);
// Compare the received timestamp is within an expected range
clock_gettime(CLOCK_REALTIME, &currentTime);
ASSERT_TRUE(rxcb.actualRxTimestampSec_ <= currentTime.tv_sec);
ASSERT_TRUE(rxcb.actualRxTimestampSec_ >= writeTimestampSec);

// Close both sockets
acceptedSocket->close();
socket->close();

ASSERT_TRUE(socket->isClosedBySelf());
ASSERT_FALSE(socket->isClosedByPeer());
}

0 comments on commit 7a2b83b

Please sign in to comment.