Skip to content

Commit

Permalink
Save ENOBUFS errno correctly in tcp_posix for subsequent handling (#2…
Browse files Browse the repository at this point in the history
…9923)

* Save ENOBUFS errno correctly in tcp_posix for subsequent handling

* remove log statement
  • Loading branch information
Vignesh2208 committed Jun 29, 2022
1 parent e8ca82b commit 85059e9
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions src/core/lib/iomgr/tcp_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -945,14 +945,15 @@ static void tcp_read(grpc_endpoint* ep, grpc_slice_buffer* incoming_buffer,

/* A wrapper around sendmsg. It sends \a msg over \a fd and returns the number
* of bytes sent. */
ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) {
ssize_t tcp_send(int fd, const struct msghdr* msg, int* saved_errno,
int additional_flags = 0) {
GPR_TIMER_SCOPE("sendmsg", 1);
ssize_t sent_length;
do {
/* TODO(klempner): Cork if this is a partial write */
GRPC_STATS_INC_SYSCALL_WRITE();
sent_length = sendmsg(fd, msg, SENDMSG_FLAGS | additional_flags);
} while (sent_length < 0 && errno == EINTR);
} while (sent_length < 0 && (*saved_errno = errno) == EINTR);
return sent_length;
}

Expand All @@ -964,7 +965,7 @@ ssize_t tcp_send(int fd, const struct msghdr* msg, int additional_flags = 0) {
*/
static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags = 0);

/** The callback function to be invoked when we get an error on the socket. */
Expand Down Expand Up @@ -1008,7 +1009,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* tcp) {

static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
size_t sending_length,
ssize_t* sent_length,
ssize_t* sent_length, int* saved_errno,
int additional_flags) {
if (!tcp->socket_ts_enabled) {
uint32_t opt = grpc_core::kTimestampingSocketOptions;
Expand Down Expand Up @@ -1037,7 +1038,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
msg->msg_controllen = CMSG_SPACE(sizeof(uint32_t));

/* If there was an error on sendmsg the logic in tcp_flush will handle it. */
ssize_t length = tcp_send(tcp->fd, msg, additional_flags);
ssize_t length = tcp_send(tcp->fd, msg, saved_errno, additional_flags);
*sent_length = length;
/* Only save timestamps if all the bytes were taken by sendmsg. */
if (sending_length == static_cast<size_t>(length)) {
Expand Down Expand Up @@ -1262,6 +1263,7 @@ static void ZerocopyDisableAndWaitForRemaining(grpc_tcp* /*tcp*/) {}
static bool tcp_write_with_timestamps(grpc_tcp* /*tcp*/, struct msghdr* /*msg*/,
size_t /*sending_length*/,
ssize_t* /*sent_length*/,
int* /* saved_errno */,
int /*additional_flags*/) {
gpr_log(GPR_ERROR, "Write with timestamps not supported for this platform");
GPR_ASSERT(0);
Expand Down Expand Up @@ -1342,6 +1344,7 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
size_t unwind_slice_idx;
size_t unwind_byte_idx;
bool tried_sending_message;
int saved_errno;
msghdr msg;
// iov consumes a large space. Keep it as the last item on the stack to
// improve locality. After all, we expect only the first elements of it being
Expand All @@ -1360,10 +1363,11 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
// Before calling sendmsg (with or without timestamps): we
// take a single ref on the zerocopy send record.
tcp->tcp_zerocopy_send_ctx.NoteSend(record);
saved_errno = 0;
if (tcp->outgoing_buffer_arg != nullptr) {
if (!tcp->ts_capable ||
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
MSG_ZEROCOPY)) {
&saved_errno, MSG_ZEROCOPY)) {
/* We could not set socket options to collect Fathom timestamps.
* Fallback on writing without timestamps. */
tcp->ts_capable = false;
Expand All @@ -1377,20 +1381,20 @@ static bool do_tcp_flush_zerocopy(grpc_tcp* tcp, TcpZerocopySendRecord* record,
msg.msg_controllen = 0;
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);
sent_length = tcp_send(tcp->fd, &msg, MSG_ZEROCOPY);
sent_length = tcp_send(tcp->fd, &msg, &saved_errno, MSG_ZEROCOPY);
}
if (sent_length < 0) {
// If this particular send failed, drop ref taken earlier in this method.
tcp->tcp_zerocopy_send_ctx.UndoSend();
if (errno == EAGAIN) {
if (saved_errno == EAGAIN) {
record->UnwindIfThrottled(unwind_slice_idx, unwind_byte_idx);
return false;
} else if (errno == EPIPE) {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
} else if (saved_errno == EPIPE) {
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
tcp_shutdown_buffer_list(tcp);
return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
tcp_shutdown_buffer_list(tcp);
return true;
}
Expand Down Expand Up @@ -1434,6 +1438,7 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
size_t trailing;
size_t unwind_slice_idx;
size_t unwind_byte_idx;
int saved_errno;

// We always start at zero, because we eagerly unref and trim the slice
// buffer as we write
Expand Down Expand Up @@ -1465,9 +1470,11 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
msg.msg_iovlen = iov_size;
msg.msg_flags = 0;
bool tried_sending_message = false;
saved_errno = 0;
if (tcp->outgoing_buffer_arg != nullptr) {
if (!tcp->ts_capable ||
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length)) {
!tcp_write_with_timestamps(tcp, &msg, sending_length, &sent_length,
&saved_errno)) {
/* We could not set socket options to collect Fathom timestamps.
* Fallback on writing without timestamps. */
tcp->ts_capable = false;
Expand All @@ -1483,25 +1490,25 @@ static bool tcp_flush(grpc_tcp* tcp, grpc_error_handle* error) {
GRPC_STATS_INC_TCP_WRITE_SIZE(sending_length);
GRPC_STATS_INC_TCP_WRITE_IOV_SIZE(iov_size);

sent_length = tcp_send(tcp->fd, &msg);
sent_length = tcp_send(tcp->fd, &msg, &saved_errno);
}

if (sent_length < 0) {
if (errno == EAGAIN) {
if (saved_errno == EAGAIN) {
tcp->outgoing_byte_idx = unwind_byte_idx;
// unref all and forget about all slices that have been written to this
// point
for (size_t idx = 0; idx < unwind_slice_idx; ++idx) {
grpc_slice_buffer_remove_first(tcp->outgoing_buffer);
}
return false;
} else if (errno == EPIPE) {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
} else if (saved_errno == EPIPE) {
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
return true;
} else {
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "sendmsg"), tcp);
*error = tcp_annotate_error(GRPC_OS_ERROR(saved_errno, "sendmsg"), tcp);
grpc_slice_buffer_reset_and_unref_internal(tcp->outgoing_buffer);
tcp_shutdown_buffer_list(tcp);
return true;
Expand Down

0 comments on commit 85059e9

Please sign in to comment.