Skip to content

Commit

Permalink
SUNRPC: Generalize the RPC buffer allocation API
Browse files Browse the repository at this point in the history
xprtrdma needs to allocate the Call and Reply buffers separately.
TBH, the reliance on using a single buffer for the pair of XDR
buffers is transport implementation-specific.

Transports that want to allocate separate Call and Reply buffers
will ignore the "size" argument anyway.  Don't bother passing it.

The buf_alloc method can't return two pointers. Instead, make the
method's return value an error code, and set the rq_buffer pointer
in the method itself.

This gives call_allocate an opportunity to terminate an RPC instead
of looping forever when a permanent problem occurs. If a request is
just bogus, or the transport is in a state where it can't allocate
resources for any request, there needs to be a way to kill the RPC
right there and not loop.

This immediately fixes a rare problem in the backchannel send path,
which loops if the server happens to send a CB request whose
call+reply size is larger than a page (which it shouldn't do yet).

One more issue: looks like xprt_inject_disconnect was incorrectly
placed in the failure path in call_allocate. It needs to be in the
success path, as it is for other call-sites.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
  • Loading branch information
chucklever authored and amschuma-ntap committed Sep 19, 2016
1 parent b9c5bc0 commit 5fe6eaa
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 37 deletions.
2 changes: 1 addition & 1 deletion include/linux/sunrpc/sched.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
void *);
void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long);
void * rpc_malloc(struct rpc_task *, size_t);
int rpc_malloc(struct rpc_task *);
void rpc_free(void *);
int rpciod_up(void);
void rpciod_down(void);
Expand Down
2 changes: 1 addition & 1 deletion include/linux/sunrpc/xprt.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ struct rpc_xprt_ops {
void (*rpcbind)(struct rpc_task *task);
void (*set_port)(struct rpc_xprt *xprt, unsigned short port);
void (*connect)(struct rpc_xprt *xprt, struct rpc_task *task);
void * (*buf_alloc)(struct rpc_task *task, size_t size);
int (*buf_alloc)(struct rpc_task *task);
void (*buf_free)(void *buffer);
int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task);
Expand Down
12 changes: 8 additions & 4 deletions net/sunrpc/clnt.c
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ call_allocate(struct rpc_task *task)
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
int status;

dprint_status(task);

Expand All @@ -1716,11 +1717,14 @@ call_allocate(struct rpc_task *task)
req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
req->rq_rcvsize <<= 2;

req->rq_buffer = xprt->ops->buf_alloc(task,
req->rq_callsize + req->rq_rcvsize);
if (req->rq_buffer != NULL)
return;
status = xprt->ops->buf_alloc(task);
xprt_inject_disconnect(xprt);
if (status == 0)
return;
if (status != -ENOMEM) {
rpc_exit(task, status);
return;
}

dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);

Expand Down
24 changes: 15 additions & 9 deletions net/sunrpc/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
}

/**
* rpc_malloc - allocate an RPC buffer
* @task: RPC task that will use this buffer
* @size: requested byte size
* rpc_malloc - allocate RPC buffer resources
* @task: RPC task
*
* A single memory region is allocated, which is split between the
* RPC call and RPC reply that this task is being used for. When
* this RPC is retired, the memory is released by calling rpc_free.
*
* To prevent rpciod from hanging, this allocator never sleeps,
* returning NULL and suppressing warning if the request cannot be serviced
* immediately.
* The caller can arrange to sleep in a way that is safe for rpciod.
* returning -ENOMEM and suppressing warning if the request cannot
* be serviced immediately. The caller can arrange to sleep in a
* way that is safe for rpciod.
*
* Most requests are 'small' (under 2KiB) and can be serviced from a
* mempool, ensuring that NFS reads and writes can always proceed,
Expand All @@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
* In order to avoid memory starvation triggering more writebacks of
* NFS requests, we avoid using GFP_KERNEL.
*/
void *rpc_malloc(struct rpc_task *task, size_t size)
int rpc_malloc(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpc_buffer *buf;
gfp_t gfp = GFP_NOIO | __GFP_NOWARN;

Expand All @@ -880,12 +885,13 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
buf = kmalloc(size, gfp);

if (!buf)
return NULL;
return -ENOMEM;

buf->len = size;
dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
task->tk_pid, size, buf);
return &buf->data;
rqst->rq_buffer = buf->data;
return 0;
}
EXPORT_SYMBOL_GPL(rpc_malloc);

Expand Down
17 changes: 9 additions & 8 deletions net/sunrpc/xprtrdma/svc_rdma_backchannel.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,29 +159,30 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
/* Server-side transport endpoint wants a whole page for its send
* buffer. The client RPC code constructs the RPC header in this
* buffer before it invokes ->send_request.
*
* Returns NULL if there was a temporary allocation failure.
*/
static void *
xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
static int
xprt_rdma_bc_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
size_t size = rqst->rq_callsize;
struct svcxprt_rdma *rdma;
struct page *page;

rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);

/* Prevent an infinite loop: try to make this case work */
if (size > PAGE_SIZE)
if (size > PAGE_SIZE) {
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
size);
return -EINVAL;
}

page = alloc_page(RPCRDMA_DEF_GFP);
if (!page)
return NULL;
return -ENOMEM;

return page_address(page);
rqst->rq_buffer = page_address(page);
return 0;
}

static void
Expand Down
26 changes: 18 additions & 8 deletions net/sunrpc/xprtrdma/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,15 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}

/*
/**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
*
* Return values:
* 0: Success; rq_buffer points to RPC buffer to use
* ENOMEM: Out of memory, call again later
* EIO: A permanent error occurred, do not retry
*
* The RDMA allocate/free functions need the task structure as a place
* to hide the struct rpcrdma_req, which is necessary for the actual send/recv
* sequence.
Expand All @@ -486,19 +494,20 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
* We may register rq_rcv_buf when using reply chunks.
*/
static void *
xprt_rdma_allocate(struct rpc_task *task, size_t size)
static int
xprt_rdma_allocate(struct rpc_task *task)
{
struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
size_t min_size;
gfp_t flags;

req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (req == NULL)
return NULL;
return -ENOMEM;

flags = RPCRDMA_DEF_GFP;
if (RPC_IS_SWAPPER(task))
Expand All @@ -515,7 +524,8 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
req->rl_task = task;
return req->rl_sendbuf->rg_base;
rqst->rq_buffer = req->rl_sendbuf->rg_base;
return 0;

out_rdmabuf:
min_size = r_xprt->rx_data.inline_wsize;
Expand Down Expand Up @@ -558,7 +568,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)

out_fail:
rpcrdma_buffer_put(req);
return NULL;
return -ENOMEM;
}

/*
Expand Down
17 changes: 11 additions & 6 deletions net/sunrpc/xprtsock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2533,23 +2533,28 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
* we allocate pages instead doing a kmalloc like rpc_malloc is because we want
* to use the server side send routines.
*/
static void *bc_malloc(struct rpc_task *task, size_t size)
static int bc_malloc(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
size_t size = rqst->rq_callsize;
struct page *page;
struct rpc_buffer *buf;

WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
return NULL;
if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
size);
return -EINVAL;
}

page = alloc_page(GFP_KERNEL);
if (!page)
return NULL;
return -ENOMEM;

buf = page_address(page);
buf->len = PAGE_SIZE;

return buf->data;
rqst->rq_buffer = buf->data;
return 0;
}

/*
Expand Down

0 comments on commit 5fe6eaa

Please sign in to comment.