Skip to content

Commit

Permalink
cf-socket: add socket recv buffering for most tcp cases
Browse files Browse the repository at this point in the history
- use bufq as recv buffer, also for Windows pre-receive handling
- catch small reads followed by larger ones in a single socket
  call. A common pattern on TLS connections.

Closes #10787
  • Loading branch information
icing authored and bagder committed Apr 13, 2023
1 parent 4cfa5bc commit 24726a4
Showing 1 changed file with 113 additions and 134 deletions.
247 changes: 113 additions & 134 deletions lib/cf-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#endif

#include "urldata.h"
#include "bufq.h"
#include "sendf.h"
#include "if2ip.h"
#include "strerror.h"
Expand Down Expand Up @@ -729,29 +730,20 @@ CURLcode Curl_socket_connect_result(struct Curl_easy *data,
}
}

#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
struct io_buffer {
char *bufr;
size_t allc; /* size of the current allocation */
size_t head; /* bufr index for next read */
size_t tail; /* bufr index for next write */
};

static void io_buffer_reset(struct io_buffer *iob)
{
if(iob->bufr)
free(iob->bufr);
memset(iob, 0, sizeof(*iob));
}
#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */
/* We have a recv buffer to enhance reads with len < NW_SMALL_READS.
* This happens often on TLS connections where the TLS implemenation
* tries to read the head of a TLS record, determine the length of the
* full record and then make a subsequent read for that.
* On large reads, we will not fill the buffer to avoid the double copy. */
#define NW_RECV_CHUNK_SIZE (64 * 1024)
#define NW_RECV_CHUNKS 1
#define NW_SMALL_READS (1024)

struct cf_socket_ctx {
int transport;
struct Curl_sockaddr_ex addr; /* address to connect to */
curl_socket_t sock; /* current attempt socket */
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
struct io_buffer recv_buffer;
#endif
struct bufq recvbuf; /* used when `buffer_recv` is set */
char r_ip[MAX_IPADR_LEN]; /* remote IP as string */
int r_port; /* remote port number */
char l_ip[MAX_IPADR_LEN]; /* local IP as string */
Expand All @@ -763,6 +755,7 @@ struct cf_socket_ctx {
BIT(got_first_byte); /* if first byte was received */
BIT(accepted); /* socket was accepted, not connected */
BIT(active);
BIT(buffer_recv);
};

static void cf_socket_ctx_init(struct cf_socket_ctx *ctx,
Expand All @@ -773,6 +766,56 @@ static void cf_socket_ctx_init(struct cf_socket_ctx *ctx,
ctx->sock = CURL_SOCKET_BAD;
ctx->transport = transport;
Curl_sock_assign_addr(&ctx->addr, ai, transport);
Curl_bufq_init(&ctx->recvbuf, NW_RECV_CHUNK_SIZE, NW_RECV_CHUNKS);
}

struct reader_ctx {
struct Curl_cfilter *cf;
struct Curl_easy *data;
};

static ssize_t nw_in_read(void *reader_ctx,
unsigned char *buf, size_t len,
CURLcode *err)
{
struct reader_ctx *rctx = reader_ctx;
struct cf_socket_ctx *ctx = rctx->cf->ctx;
ssize_t nread;

*err = CURLE_OK;
nread = sread(ctx->sock, buf, len);

if(-1 == nread) {
int sockerr = SOCKERRNO;

if(
#ifdef WSAEWOULDBLOCK
/* This is how Windows does it */
(WSAEWOULDBLOCK == sockerr)
#else
/* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned
due to its inability to send off data without blocking. We therefore
treat both error codes the same here */
(EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr)
#endif
) {
/* this is just a case of EWOULDBLOCK */
*err = CURLE_AGAIN;
nread = -1;
}
else {
char buffer[STRERROR_LEN];

failf(rctx->data, "Recv failure: %s",
Curl_strerror(sockerr, buffer, sizeof(buffer)));
rctx->data->state.os_errno = sockerr;
*err = CURLE_RECV_ERROR;
nread = -1;
}
}
DEBUGF(LOG_CF(rctx->data, rctx->cf, "nw_in_read(len=%zu) -> %d, err=%d",
len, (int)nread, *err));
return nread;
}

static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data)
Expand Down Expand Up @@ -808,10 +851,9 @@ static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data)
sclose(ctx->sock);
ctx->sock = CURL_SOCKET_BAD;
}
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
io_buffer_reset(&ctx->recv_buffer);
#endif
Curl_bufq_reset(&ctx->recvbuf);
ctx->active = FALSE;
ctx->buffer_recv = FALSE;
memset(&ctx->started_at, 0, sizeof(ctx->started_at));
memset(&ctx->connected_at, 0, sizeof(ctx->connected_at));
}
Expand All @@ -825,6 +867,7 @@ static void cf_socket_destroy(struct Curl_cfilter *cf, struct Curl_easy *data)

cf_socket_close(cf, data);
DEBUGF(LOG_CF(data, cf, "destroy"));
Curl_bufq_free(&ctx->recvbuf);
free(ctx);
cf->ctx = NULL;
}
Expand Down Expand Up @@ -1153,89 +1196,16 @@ static int cf_socket_get_select_socks(struct Curl_cfilter *cf,
return rc;
}

#ifdef USE_RECV_BEFORE_SEND_WORKAROUND

static CURLcode pre_receive_plain(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_socket_ctx *ctx = cf->ctx;
struct io_buffer * const iob = &ctx->recv_buffer;

/* WinSock will destroy unread received data if send() is
failed.
To avoid lossage of received data, recv() must be
performed before every send() if any incoming data is
available. However, skip this, if buffer is already full. */
if((cf->conn->handler->protocol&PROTO_FAMILY_HTTP) != 0 &&
cf->conn->recv[cf->sockindex] == Curl_conn_recv &&
(!iob->bufr || (iob->allc > iob->tail))) {
const int readymask = Curl_socket_check(ctx->sock, CURL_SOCKET_BAD,
CURL_SOCKET_BAD, 0);
if(readymask != -1 && (readymask & CURL_CSELECT_IN) != 0) {
size_t bytestorecv = iob->allc - iob->tail;
ssize_t nread;
/* Have some incoming data */
if(!iob->bufr) {
/* Use buffer double default size for intermediate buffer */
iob->allc = 2 * data->set.buffer_size;
iob->bufr = malloc(iob->allc);
if(!iob->bufr)
return CURLE_OUT_OF_MEMORY;
iob->tail = 0;
iob->head = 0;
bytestorecv = iob->allc;
}

nread = sread(ctx->sock, iob->bufr + iob->tail, bytestorecv);
if(nread > 0)
iob->tail += (size_t)nread;
}
}
return CURLE_OK;
}

static ssize_t get_pre_recved(struct Curl_cfilter *cf, char *buf, size_t len)
{
struct cf_socket_ctx *ctx = cf->ctx;
struct io_buffer * const iob = &ctx->recv_buffer;
size_t copysize;
if(!iob->bufr)
return 0;

DEBUGASSERT(iob->allc > 0);
DEBUGASSERT(iob->tail <= iob->allc);
DEBUGASSERT(iob->head <= iob->tail);
/* Check and process data that already received and storied in internal
intermediate buffer */
if(iob->tail > iob->head) {
copysize = CURLMIN(len, iob->tail - iob->head);
memcpy(buf, iob->bufr + iob->head, copysize);
iob->head += copysize;
}
else
copysize = 0; /* buffer was allocated, but nothing was received */

/* Free intermediate buffer if it has no unprocessed data */
if(iob->head == iob->tail)
io_buffer_reset(iob);

return (ssize_t)copysize;
}
#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */

static bool cf_socket_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct cf_socket_ctx *ctx = cf->ctx;
int readable;

#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
if(ctx->recv_buffer.bufr && ctx->recv_buffer.allc &&
ctx->recv_buffer.tail > ctx->recv_buffer.head)
(void)data;
if(!Curl_bufq_is_empty(&ctx->recvbuf))
return TRUE;
#endif

(void)data;
readable = SOCKET_READABLE(ctx->sock, 0);
return (readable > 0 && (readable & CURL_CSELECT_IN));
}
Expand All @@ -1247,20 +1217,21 @@ static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data,
curl_socket_t fdsave;
ssize_t nwritten;

*err = CURLE_OK;

#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
/* WinSock will destroy unread received data if send() is
failed.
To avoid lossage of received data, recv() must be
performed before every send() if any incoming data is
available. */
if(pre_receive_plain(cf, data)) {
*err = CURLE_OUT_OF_MEMORY;
return -1;
if(ctx->buffer_recv && !Curl_bufq_is_full(&ctx->recvbuf)) {
nwritten = Curl_bufq_slurp(&ctx->recvbuf, nw_in_read, &rctx, err);
if(nwritten < 0 && *err != CURLE_AGAIN) {
return -1;
}
}
#endif

*err = CURLE_OK;
fdsave = cf->conn->sock[cf->sockindex];
cf->conn->sock[cf->sockindex] = ctx->sock;

Expand Down Expand Up @@ -1317,47 +1288,50 @@ static ssize_t cf_socket_recv(struct Curl_cfilter *cf, struct Curl_easy *data,

*err = CURLE_OK;

#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
/* Check and return data that already received and storied in internal
intermediate buffer */
nread = get_pre_recved(cf, buf, len);
if(nread > 0) {
*err = CURLE_OK;
return nread;
}
#endif

fdsave = cf->conn->sock[cf->sockindex];
cf->conn->sock[cf->sockindex] = ctx->sock;

nread = sread(ctx->sock, buf, len);

if(-1 == nread) {
int sockerr = SOCKERRNO;

if(
#ifdef WSAEWOULDBLOCK
/* This is how Windows does it */
(WSAEWOULDBLOCK == sockerr)
#else
/* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned
due to its inability to send off data without blocking. We therefore
treat both error codes the same here */
(EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr)
#endif
) {
/* this is just a case of EWOULDBLOCK */
*err = CURLE_AGAIN;
if(ctx->buffer_recv && !Curl_bufq_is_empty(&ctx->recvbuf)) {
DEBUGF(LOG_CF(data, cf, "recv from buffer"));
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
}
else {
struct reader_ctx rctx;

rctx.cf = cf;
rctx.data = data;

/* "small" reads may trigger filling our buffer, "large" reads
* are probably not worth the additional copy */
if(ctx->buffer_recv && len < NW_SMALL_READS) {
ssize_t nwritten;
nwritten = Curl_bufq_slurp(&ctx->recvbuf, nw_in_read, &rctx, err);
if(nwritten < 0 && !Curl_bufq_is_empty(&ctx->recvbuf)) {
/* we have a partial read with an error. need to deliver
* what we got, return the error later. */
DEBUGF(LOG_CF(data, cf, "partial read: empty buffer first"));
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
}
else if(nwritten < 0) {
nread = -1;
goto out;
}
else if(nwritten == 0) {
/* eof */
*err = CURLE_OK;
nread = 0;
}
else {
DEBUGF(LOG_CF(data, cf, "buffered %zd additional bytes", nwritten));
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
}
}
else {
char buffer[STRERROR_LEN];
failf(data, "Recv failure: %s",
Curl_strerror(sockerr, buffer, sizeof(buffer)));
data->state.os_errno = sockerr;
*err = CURLE_RECV_ERROR;
nread = nw_in_read(&rctx, (unsigned char *)buf, len, err);
}
}

out:
DEBUGF(LOG_CF(data, cf, "recv(len=%zu) -> %d, err=%d", len, (int)nread,
*err));
if(nread > 0 && !ctx->got_first_byte) {
Expand Down Expand Up @@ -1413,6 +1387,11 @@ static void cf_socket_active(struct Curl_cfilter *cf, struct Curl_easy *data)
conn_set_primary_ip(cf, data);
set_local_ip(cf, data);
Curl_persistconninfo(data, cf->conn, ctx->l_ip, ctx->l_port);
/* We buffer only for TCP transfers that do not install their own
* read function. Those may still have expectations about socket
* behaviours from the past. */
ctx->buffer_recv = (ctx->transport == TRNSPRT_TCP &&
(cf->conn->recv[cf->sockindex] == Curl_conn_recv));
}
ctx->active = TRUE;
}
Expand Down

0 comments on commit 24726a4

Please sign in to comment.