Skip to content

Commit 00e9071

Browse files
committed
rxrpc: Preallocate peers, conns and calls for incoming service requests
Make it possible for the data_ready handler called from the UDP transport socket to completely instantiate an rxrpc_call structure and make it immediately live by preallocating all the memory it might need. The idea is to cut out the background thread usage as much as possible. [Note that the preallocated structs are not actually used in this patch - that will be done in a future patch.] If insufficient resources are available in the preallocation buffers, it will be possible to discard the DATA packet in the data_ready handler or schedule a BUSY packet without the need to schedule an attempt at allocation in a background thread. To this end: (1) Preallocate rxrpc_peer, rxrpc_connection and rxrpc_call structs to a maximum number each of the listen backlog size. The backlog size is limited to a maxmimum of 32. Only this many of each can be in the preallocation buffer. (2) For userspace sockets, the preallocation is charged initially by listen() and will be recharged by accepting or rejecting pending new incoming calls. (3) For kernel services {,re,dis}charging of the preallocation buffers is handled manually. Two notifier callbacks have to be provided before kernel_listen() is invoked: (a) An indication that a new call has been instantiated. This can be used to trigger background recharging. (b) An indication that a call is being discarded. This is used when the socket is being released. A function, rxrpc_kernel_charge_accept() is called by the kernel service to preallocate a single call. It should be passed the user ID to be used for that call and a callback to associate the rxrpc call with the kernel service's side of the ID. (4) Discard the preallocation when the socket is closed. (5) Temporarily bump the refcount on the call allocated in rxrpc_incoming_call() so that rxrpc_release_call() can ditch the preallocation ref on service calls unconditionally. This will no longer be necessary once the preallocation is used. Note that this does not yet control the number of active service calls on a client - that will come in a later patch. A future development would be to provide a setsockopt() call that allows a userspace server to manually charge the preallocation buffer. This would allow user call IDs to be provided in advance and the awkward manual accept stage to be bypassed. Signed-off-by: David Howells <dhowells@redhat.com>
1 parent 49e19ec commit 00e9071

File tree

10 files changed

+391
-15
lines changed

10 files changed

+391
-15
lines changed

fs/afs/rxrpc.c

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
struct socket *afs_socket; /* my RxRPC socket */
2020
static struct workqueue_struct *afs_async_calls;
21+
static struct afs_call *afs_spare_incoming_call;
2122
static atomic_t afs_outstanding_calls;
2223

2324
static void afs_free_call(struct afs_call *);
@@ -26,7 +27,8 @@ static int afs_wait_for_call_to_complete(struct afs_call *);
2627
static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
2728
static int afs_dont_wait_for_call_to_complete(struct afs_call *);
2829
static void afs_process_async_call(struct work_struct *);
29-
static void afs_rx_new_call(struct sock *);
30+
static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
31+
static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
3032
static int afs_deliver_cm_op_id(struct afs_call *);
3133

3234
/* synchronous call management */
@@ -54,8 +56,10 @@ static const struct afs_call_type afs_RXCMxxxx = {
5456
};
5557

5658
static void afs_collect_incoming_call(struct work_struct *);
59+
static void afs_charge_preallocation(struct work_struct *);
5760

5861
static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
62+
static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);
5963

6064
static int afs_wait_atomic_t(atomic_t *p)
6165
{
@@ -100,13 +104,15 @@ int afs_open_socket(void)
100104
if (ret < 0)
101105
goto error_2;
102106

103-
rxrpc_kernel_new_call_notification(socket, afs_rx_new_call);
107+
rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
108+
afs_rx_discard_new_call);
104109

105110
ret = kernel_listen(socket, INT_MAX);
106111
if (ret < 0)
107112
goto error_2;
108113

109114
afs_socket = socket;
115+
afs_charge_preallocation(NULL);
110116
_leave(" = 0");
111117
return 0;
112118

@@ -126,6 +132,12 @@ void afs_close_socket(void)
126132
{
127133
_enter("");
128134

135+
if (afs_spare_incoming_call) {
136+
atomic_inc(&afs_outstanding_calls);
137+
afs_free_call(afs_spare_incoming_call);
138+
afs_spare_incoming_call = NULL;
139+
}
140+
129141
_debug("outstanding %u", atomic_read(&afs_outstanding_calls));
130142
wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
131143
TASK_UNINTERRUPTIBLE);
@@ -635,12 +647,65 @@ static void afs_collect_incoming_call(struct work_struct *work)
635647
afs_free_call(call);
636648
}
637649

650+
static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
651+
{
652+
struct afs_call *call = (struct afs_call *)user_call_ID;
653+
654+
call->rxcall = rxcall;
655+
}
656+
657+
/*
658+
* Charge the incoming call preallocation.
659+
*/
660+
static void afs_charge_preallocation(struct work_struct *work)
661+
{
662+
struct afs_call *call = afs_spare_incoming_call;
663+
664+
for (;;) {
665+
if (!call) {
666+
call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
667+
if (!call)
668+
break;
669+
670+
INIT_WORK(&call->async_work, afs_process_async_call);
671+
call->wait_mode = &afs_async_incoming_call;
672+
call->type = &afs_RXCMxxxx;
673+
init_waitqueue_head(&call->waitq);
674+
call->state = AFS_CALL_AWAIT_OP_ID;
675+
}
676+
677+
if (rxrpc_kernel_charge_accept(afs_socket,
678+
afs_wake_up_async_call,
679+
afs_rx_attach,
680+
(unsigned long)call,
681+
GFP_KERNEL) < 0)
682+
break;
683+
call = NULL;
684+
}
685+
afs_spare_incoming_call = call;
686+
}
687+
688+
/*
689+
* Discard a preallocated call when a socket is shut down.
690+
*/
691+
static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
692+
unsigned long user_call_ID)
693+
{
694+
struct afs_call *call = (struct afs_call *)user_call_ID;
695+
696+
atomic_inc(&afs_outstanding_calls);
697+
call->rxcall = NULL;
698+
afs_free_call(call);
699+
}
700+
638701
/*
639702
* Notification of an incoming call.
640703
*/
641-
static void afs_rx_new_call(struct sock *sk)
704+
static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
705+
unsigned long user_call_ID)
642706
{
643707
queue_work(afs_wq, &afs_collect_incoming_call_work);
708+
queue_work(afs_wq, &afs_charge_preallocation_work);
644709
}
645710

646711
/*

include/net/af_rxrpc.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@ struct rxrpc_call;
2121

2222
typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
2323
unsigned long);
24-
typedef void (*rxrpc_notify_new_call_t)(struct sock *);
24+
typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
25+
unsigned long);
26+
typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
27+
typedef void (*rxrpc_user_attach_call_t)(struct rxrpc_call *, unsigned long);
2528

2629
void rxrpc_kernel_new_call_notification(struct socket *,
27-
rxrpc_notify_new_call_t);
30+
rxrpc_notify_new_call_t,
31+
rxrpc_discard_new_call_t);
2832
struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
2933
struct sockaddr_rxrpc *,
3034
struct key *,
@@ -43,5 +47,7 @@ struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *, unsigned long,
4347
int rxrpc_kernel_reject_call(struct socket *);
4448
void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
4549
struct sockaddr_rxrpc *);
50+
int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
51+
rxrpc_user_attach_call_t, unsigned long, gfp_t);
4652

4753
#endif /* _NET_RXRPC_H */

net/rxrpc/af_rxrpc.c

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ static int rxrpc_listen(struct socket *sock, int backlog)
193193
{
194194
struct sock *sk = sock->sk;
195195
struct rxrpc_sock *rx = rxrpc_sk(sk);
196-
unsigned int max;
196+
unsigned int max, old;
197197
int ret;
198198

199199
_enter("%p,%d", rx, backlog);
@@ -212,9 +212,13 @@ static int rxrpc_listen(struct socket *sock, int backlog)
212212
backlog = max;
213213
else if (backlog < 0 || backlog > max)
214214
break;
215+
old = sk->sk_max_ack_backlog;
215216
sk->sk_max_ack_backlog = backlog;
216-
rx->sk.sk_state = RXRPC_SERVER_LISTENING;
217-
ret = 0;
217+
ret = rxrpc_service_prealloc(rx, GFP_KERNEL);
218+
if (ret == 0)
219+
rx->sk.sk_state = RXRPC_SERVER_LISTENING;
220+
else
221+
sk->sk_max_ack_backlog = old;
218222
break;
219223
default:
220224
ret = -EBUSY;
@@ -303,16 +307,19 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
303307
* rxrpc_kernel_new_call_notification - Get notifications of new calls
304308
* @sock: The socket to intercept received messages on
305309
* @notify_new_call: Function to be called when new calls appear
310+
* @discard_new_call: Function to discard preallocated calls
306311
*
307312
* Allow a kernel service to be given notifications about new calls.
308313
*/
309314
void rxrpc_kernel_new_call_notification(
310315
struct socket *sock,
311-
rxrpc_notify_new_call_t notify_new_call)
316+
rxrpc_notify_new_call_t notify_new_call,
317+
rxrpc_discard_new_call_t discard_new_call)
312318
{
313319
struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
314320

315321
rx->notify_new_call = notify_new_call;
322+
rx->discard_new_call = discard_new_call;
316323
}
317324
EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
318325

@@ -622,6 +629,7 @@ static int rxrpc_release_sock(struct sock *sk)
622629
}
623630

624631
/* try to flush out this socket */
632+
rxrpc_discard_prealloc(rx);
625633
rxrpc_release_calls_on_socket(rx);
626634
flush_workqueue(rxrpc_workqueue);
627635
rxrpc_purge_queue(&sk->sk_receive_queue);

net/rxrpc/ar-internal.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,43 @@ enum {
6363
RXRPC_CLOSE, /* socket is being closed */
6464
};
6565

66+
/*
67+
* Service backlog preallocation.
68+
*
69+
* This contains circular buffers of preallocated peers, connections and calls
70+
* for incoming service calls and their head and tail pointers. This allows
71+
* calls to be set up in the data_ready handler, thereby avoiding the need to
72+
* shuffle packets around so much.
73+
*/
74+
struct rxrpc_backlog {
75+
unsigned short peer_backlog_head;
76+
unsigned short peer_backlog_tail;
77+
unsigned short conn_backlog_head;
78+
unsigned short conn_backlog_tail;
79+
unsigned short call_backlog_head;
80+
unsigned short call_backlog_tail;
81+
#define RXRPC_BACKLOG_MAX 32
82+
struct rxrpc_peer *peer_backlog[RXRPC_BACKLOG_MAX];
83+
struct rxrpc_connection *conn_backlog[RXRPC_BACKLOG_MAX];
84+
struct rxrpc_call *call_backlog[RXRPC_BACKLOG_MAX];
85+
};
86+
6687
/*
6788
* RxRPC socket definition
6889
*/
6990
struct rxrpc_sock {
7091
/* WARNING: sk has to be the first member */
7192
struct sock sk;
7293
rxrpc_notify_new_call_t notify_new_call; /* Func to notify of new call */
94+
rxrpc_discard_new_call_t discard_new_call; /* Func to discard a new call */
7395
struct rxrpc_local *local; /* local endpoint */
7496
struct hlist_node listen_link; /* link in the local endpoint's listen list */
7597
struct list_head secureq; /* calls awaiting connection security clearance */
7698
struct list_head acceptq; /* calls awaiting acceptance */
99+
struct rxrpc_backlog *backlog; /* Preallocation for services */
77100
struct key *key; /* security for this socket */
78101
struct key *securities; /* list of server security descriptors */
79-
struct rb_root calls; /* outstanding calls on this socket */
102+
struct rb_root calls; /* User ID -> call mapping */
80103
unsigned long flags;
81104
#define RXRPC_SOCK_CONNECTED 0 /* connect_srx is set */
82105
rwlock_t call_lock; /* lock for calls */
@@ -290,6 +313,7 @@ enum rxrpc_conn_cache_state {
290313
enum rxrpc_conn_proto_state {
291314
RXRPC_CONN_UNUSED, /* Connection not yet attempted */
292315
RXRPC_CONN_CLIENT, /* Client connection */
316+
RXRPC_CONN_SERVICE_PREALLOC, /* Service connection preallocation */
293317
RXRPC_CONN_SERVICE_UNSECURED, /* Service unsecured connection */
294318
RXRPC_CONN_SERVICE_CHALLENGING, /* Service challenging for security */
295319
RXRPC_CONN_SERVICE, /* Service secured connection */
@@ -408,6 +432,7 @@ enum rxrpc_call_state {
408432
RXRPC_CALL_CLIENT_AWAIT_REPLY, /* - client awaiting reply */
409433
RXRPC_CALL_CLIENT_RECV_REPLY, /* - client receiving reply phase */
410434
RXRPC_CALL_CLIENT_FINAL_ACK, /* - client sending final ACK phase */
435+
RXRPC_CALL_SERVER_PREALLOC, /* - service preallocation */
411436
RXRPC_CALL_SERVER_SECURING, /* - server securing request connection */
412437
RXRPC_CALL_SERVER_ACCEPTING, /* - server accepting request */
413438
RXRPC_CALL_SERVER_RECV_REQUEST, /* - server receiving request */
@@ -534,6 +559,8 @@ extern struct workqueue_struct *rxrpc_workqueue;
534559
/*
535560
* call_accept.c
536561
*/
562+
int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
563+
void rxrpc_discard_prealloc(struct rxrpc_sock *);
537564
void rxrpc_accept_incoming_calls(struct rxrpc_local *);
538565
struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
539566
rxrpc_notify_rx_t);
@@ -557,6 +584,7 @@ extern struct list_head rxrpc_calls;
557584
extern rwlock_t rxrpc_call_lock;
558585

559586
struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
587+
struct rxrpc_call *rxrpc_alloc_call(gfp_t);
560588
struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
561589
struct rxrpc_conn_parameters *,
562590
struct sockaddr_rxrpc *,
@@ -573,6 +601,7 @@ void rxrpc_get_call(struct rxrpc_call *, enum rxrpc_call_trace);
573601
void rxrpc_put_call(struct rxrpc_call *, enum rxrpc_call_trace);
574602
void rxrpc_get_call_for_skb(struct rxrpc_call *, struct sk_buff *);
575603
void rxrpc_put_call_for_skb(struct rxrpc_call *, struct sk_buff *);
604+
void rxrpc_cleanup_call(struct rxrpc_call *);
576605
void __exit rxrpc_destroy_all_calls(void);
577606

578607
static inline bool rxrpc_is_service_call(const struct rxrpc_call *call)
@@ -757,6 +786,7 @@ struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *,
757786
struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
758787
struct sockaddr_rxrpc *,
759788
struct sk_buff *);
789+
struct rxrpc_connection *rxrpc_prealloc_service_connection(gfp_t);
760790
void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
761791

762792
/*

0 commit comments

Comments
 (0)