Skip to content

Commit a8248fc

Browse files
author
Paolo Abeni
committed
Merge tag 'rxrpc-next-20230131' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs
David Howells says: ==================== Here's the fifth part of patches in the process of moving rxrpc from doing a lot of its stuff in softirq context to doing it in an I/O thread in process context and thereby making it easier to support a larger SACK table. The full description is in the description for the first part[1] which is now upstream. The second and third parts are also upstream[2]. A subset of the original fourth part[3] got applied as a fix for a race[4]. The fifth part includes some cleanups: (1) Miscellaneous trace header cleanups: fix a trace string, display the security index in rx_packet rather than displaying the type twice, remove some whitespace to make checkpatch happier and remove some excess tabulation. (2) Convert ->recvmsg_lock to a spinlock as it's only ever locked exclusively. (3) Make ->ackr_window and ->ackr_nr_unacked non-atomic as they're only used in the I/O thread. (4) Don't use call->tx_lock to access ->tx_buffer as that is only accessed inside the I/O thread. sendmsg() loads onto ->tx_sendmsg and the I/O thread decants from that to the buffer. (5) Remove local->defrag_sem as DATA packets are transmitted serially by the I/O thread. (6) Remove the service connection bundle is it was only used for its channel_lock - which has now gone. And some more significant changes: (7) Add a debugging option to allow a delay to be injected into packet reception to help investigate the behaviour over longer links than just a few cm. (8) Generate occasional PING ACKs to probe for RTT information during a receive heavy call. (9) Simplify the SACK table maintenance and ACK generation. Now that both parts are done in the same thread, there's no possibility of a race and no need to try and be cunning to avoid taking a BH spinlock whilst copying the SACK table (which in the future will be up to 2K) and no need to rotate the copy to fit the ACK packet table. (10) Use SKB_CONSUMED when freeing received DATA packets (stop dropwatch complaining). * tag 'rxrpc-next-20230131' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs: rxrpc: Kill service bundle rxrpc: Change rx_packet tracepoint to display securityIndex not type twice rxrpc: Show consumed and freed packets as non-dropped in dropwatch rxrpc: Remove local->defrag_sem rxrpc: Don't lock call->tx_lock to access call->tx_buffer rxrpc: Simplify ACK handling rxrpc: De-atomic call->ackr_window and call->ackr_nr_unacked rxrpc: Generate extra pings for RTT during heavy-receive call rxrpc: Allow a delay to be injected into packet reception rxrpc: Convert call->recvmsg_lock to a spinlock rxrpc: Shrink the tabulation in the rxrpc trace header a bit rxrpc: Remove whitespace before ')' in trace header rxrpc: Fix trace string ==================== Link: https://lore.kernel.org/all/20230131171227.3912130-1-dhowells@redhat.com/ Signed-off-by: Paolo Abeni <pabeni@redhat.com>
2 parents 609aa68 + 550130a commit a8248fc

File tree

18 files changed

+438
-345
lines changed

18 files changed

+438
-345
lines changed

include/trace/events/rxrpc.h

Lines changed: 259 additions & 221 deletions
Large diffs are not rendered by default.

net/rxrpc/Kconfig

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ config AF_RXRPC_INJECT_LOSS
3636
Say Y here to inject packet loss by discarding some received and some
3737
transmitted packets.
3838

39+
config AF_RXRPC_INJECT_RX_DELAY
40+
bool "Inject delay into packet reception"
41+
depends on SYSCTL
42+
help
43+
Say Y here to inject a delay into packet reception, allowing an
44+
extended RTT time to be modelled. The delay can be configured using
45+
/proc/sys/net/rxrpc/rxrpc_inject_rx_delay, setting a number of
46+
milliseconds up to 0.5s (note that the granularity is actually in
47+
jiffies).
3948

4049
config AF_RXRPC_DEBUG
4150
bool "RxRPC dynamic debugging"

net/rxrpc/af_rxrpc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,7 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
786786
INIT_LIST_HEAD(&rx->sock_calls);
787787
INIT_LIST_HEAD(&rx->to_be_accepted);
788788
INIT_LIST_HEAD(&rx->recvmsg_q);
789-
rwlock_init(&rx->recvmsg_lock);
789+
spin_lock_init(&rx->recvmsg_lock);
790790
rwlock_init(&rx->call_lock);
791791
memset(&rx->srx, 0, sizeof(rx->srx));
792792

net/rxrpc/ar-internal.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ struct rxrpc_sock {
149149
struct list_head sock_calls; /* List of calls owned by this socket */
150150
struct list_head to_be_accepted; /* calls awaiting acceptance */
151151
struct list_head recvmsg_q; /* Calls awaiting recvmsg's attention */
152-
rwlock_t recvmsg_lock; /* Lock for recvmsg_q */
152+
spinlock_t recvmsg_lock; /* Lock for recvmsg_q */
153153
struct key *key; /* security for this socket */
154154
struct key *securities; /* list of server security descriptors */
155155
struct rb_root calls; /* User ID -> call mapping */
@@ -284,7 +284,9 @@ struct rxrpc_local {
284284
struct task_struct *io_thread;
285285
struct completion io_thread_ready; /* Indication that the I/O thread started */
286286
struct rxrpc_sock *service; /* Service(s) listening on this endpoint */
287-
struct rw_semaphore defrag_sem; /* control re-enablement of IP DF bit */
287+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
288+
struct sk_buff_head rx_delay_queue; /* Delay injection queue */
289+
#endif
288290
struct sk_buff_head rx_queue; /* Received packets */
289291
struct list_head conn_attend_q; /* Conns requiring immediate attention */
290292
struct list_head call_attend_q; /* Calls requiring immediate attention */
@@ -688,9 +690,11 @@ struct rxrpc_call {
688690

689691
/* Receive-phase ACK management (ACKs we send). */
690692
u8 ackr_reason; /* reason to ACK */
693+
u16 ackr_sack_base; /* Starting slot in SACK table ring */
691694
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
692-
atomic64_t ackr_window; /* Base (in LSW) and top (in MSW) of SACK window */
693-
atomic_t ackr_nr_unacked; /* Number of unacked packets */
695+
rxrpc_seq_t ackr_window; /* Base of SACK window */
696+
rxrpc_seq_t ackr_wtop; /* Base of SACK window */
697+
unsigned int ackr_nr_unacked; /* Number of unacked packets */
694698
atomic_t ackr_nr_consumed; /* Number of packets needing hard ACK */
695699
struct {
696700
#define RXRPC_SACK_SIZE 256
@@ -1109,6 +1113,9 @@ extern unsigned long rxrpc_idle_ack_delay;
11091113
extern unsigned int rxrpc_rx_window_size;
11101114
extern unsigned int rxrpc_rx_mtu;
11111115
extern unsigned int rxrpc_rx_jumbo_max;
1116+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
1117+
extern unsigned long rxrpc_inject_rx_delay;
1118+
#endif
11121119

11131120
/*
11141121
* net_ns.c

net/rxrpc/call_accept.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
195195
tail = b->peer_backlog_tail;
196196
while (CIRC_CNT(head, tail, size) > 0) {
197197
struct rxrpc_peer *peer = b->peer_backlog[tail];
198-
rxrpc_put_local(peer->local, rxrpc_local_put_prealloc_conn);
198+
rxrpc_put_local(peer->local, rxrpc_local_put_prealloc_peer);
199199
kfree(peer);
200200
tail = (tail + 1) & (size - 1);
201201
}

net/rxrpc/call_event.c

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,18 @@ bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
498498
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
499499
rxrpc_propose_ack_rx_idle);
500500

501-
if (atomic_read(&call->ackr_nr_unacked) > 2)
502-
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
503-
rxrpc_propose_ack_input_data);
501+
if (call->ackr_nr_unacked > 2) {
502+
if (call->peer->rtt_count < 3)
503+
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
504+
rxrpc_propose_ack_ping_for_rtt);
505+
else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000),
506+
ktime_get_real()))
507+
rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
508+
rxrpc_propose_ack_ping_for_old_rtt);
509+
else
510+
rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0,
511+
rxrpc_propose_ack_input_data);
512+
}
504513

505514
/* Make sure the timer is restarted */
506515
if (!__rxrpc_call_is_complete(call)) {

net/rxrpc/call_object.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
167167
call->tx_total_len = -1;
168168
call->next_rx_timo = 20 * HZ;
169169
call->next_req_timo = 1 * HZ;
170-
atomic64_set(&call->ackr_window, 0x100000001ULL);
170+
call->ackr_window = 1;
171+
call->ackr_wtop = 1;
171172

172173
memset(&call->sock_node, 0xed, sizeof(call->sock_node));
173174

@@ -560,7 +561,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
560561
rxrpc_put_call_slot(call);
561562

562563
/* Make sure we don't get any more notifications */
563-
write_lock(&rx->recvmsg_lock);
564+
spin_lock(&rx->recvmsg_lock);
564565

565566
if (!list_empty(&call->recvmsg_link)) {
566567
_debug("unlinking once-pending call %p { e=%lx f=%lx }",
@@ -573,7 +574,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
573574
call->recvmsg_link.next = NULL;
574575
call->recvmsg_link.prev = NULL;
575576

576-
write_unlock(&rx->recvmsg_lock);
577+
spin_unlock(&rx->recvmsg_lock);
577578
if (put)
578579
rxrpc_put_call(call, rxrpc_call_put_unnotify);
579580

net/rxrpc/conn_service.c

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,6 @@
88
#include <linux/slab.h>
99
#include "ar-internal.h"
1010

11-
static struct rxrpc_bundle rxrpc_service_dummy_bundle = {
12-
.ref = REFCOUNT_INIT(1),
13-
.debug_id = UINT_MAX,
14-
};
15-
1611
/*
1712
* Find a service connection under RCU conditions.
1813
*
@@ -132,8 +127,6 @@ struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxn
132127
*/
133128
conn->state = RXRPC_CONN_SERVICE_PREALLOC;
134129
refcount_set(&conn->ref, 2);
135-
conn->bundle = rxrpc_get_bundle(&rxrpc_service_dummy_bundle,
136-
rxrpc_bundle_get_service_conn);
137130

138131
atomic_inc(&rxnet->nr_conns);
139132
write_lock(&rxnet->conn_lock);

net/rxrpc/input.c

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,8 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
338338
static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
339339
rxrpc_seq_t window, rxrpc_seq_t wtop)
340340
{
341-
atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
341+
call->ackr_window = window;
342+
call->ackr_wtop = wtop;
342343
}
343344

344345
/*
@@ -367,9 +368,9 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
367368
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
368369
struct sk_buff *oos;
369370
rxrpc_serial_t serial = sp->hdr.serial;
370-
u64 win = atomic64_read(&call->ackr_window);
371-
rxrpc_seq_t window = lower_32_bits(win);
372-
rxrpc_seq_t wtop = upper_32_bits(win);
371+
unsigned int sack = call->ackr_sack_base;
372+
rxrpc_seq_t window = call->ackr_window;
373+
rxrpc_seq_t wtop = call->ackr_wtop;
373374
rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
374375
rxrpc_seq_t seq = sp->hdr.seq;
375376
bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
@@ -410,20 +411,23 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
410411

411412
/* Queue the packet. */
412413
if (seq == window) {
413-
rxrpc_seq_t reset_from;
414-
bool reset_sack = false;
415-
416414
if (sp->hdr.flags & RXRPC_REQUEST_ACK)
417415
ack_reason = RXRPC_ACK_REQUESTED;
418416
/* Send an immediate ACK if we fill in a hole */
419417
else if (!skb_queue_empty(&call->rx_oos_queue))
420418
ack_reason = RXRPC_ACK_DELAY;
421419
else
422-
atomic_inc_return(&call->ackr_nr_unacked);
420+
call->ackr_nr_unacked++;
423421

424422
window++;
425-
if (after(window, wtop))
423+
if (after(window, wtop)) {
424+
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none);
426425
wtop = window;
426+
} else {
427+
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance);
428+
sack = (sack + 1) % RXRPC_SACK_SIZE;
429+
}
430+
427431

428432
rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);
429433

@@ -440,43 +444,39 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
440444
__skb_unlink(oos, &call->rx_oos_queue);
441445
last = osp->hdr.flags & RXRPC_LAST_PACKET;
442446
seq = osp->hdr.seq;
443-
if (!reset_sack) {
444-
reset_from = seq;
445-
reset_sack = true;
446-
}
447+
call->ackr_sack_table[sack] = 0;
448+
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill);
449+
sack = (sack + 1) % RXRPC_SACK_SIZE;
447450

448451
window++;
449452
rxrpc_input_queue_data(call, oos, window, wtop,
450-
rxrpc_receive_queue_oos);
453+
rxrpc_receive_queue_oos);
451454
}
452455

453456
spin_unlock(&call->recvmsg_queue.lock);
454457

455-
if (reset_sack) {
456-
do {
457-
call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
458-
} while (reset_from++, before(reset_from, window));
459-
}
458+
call->ackr_sack_base = sack;
460459
} else {
461-
bool keep = false;
460+
unsigned int slot;
462461

463462
ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
464463

465-
if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
466-
call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
467-
keep = 1;
464+
slot = seq - window;
465+
sack = (sack + slot) % RXRPC_SACK_SIZE;
466+
467+
if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) {
468+
ack_reason = RXRPC_ACK_DUPLICATE;
469+
goto send_ack;
468470
}
469471

472+
call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1;
473+
trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos);
474+
470475
if (after(seq + 1, wtop)) {
471476
wtop = seq + 1;
472477
rxrpc_input_update_ack_window(call, window, wtop);
473478
}
474479

475-
if (!keep) {
476-
ack_reason = RXRPC_ACK_DUPLICATE;
477-
goto send_ack;
478-
}
479-
480480
skb_queue_walk(&call->rx_oos_queue, oos) {
481481
struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
482482

@@ -567,8 +567,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
567567
rxrpc_serial_t serial = sp->hdr.serial;
568568
rxrpc_seq_t seq0 = sp->hdr.seq;
569569

570-
_enter("{%llx,%x},{%u,%x}",
571-
atomic64_read(&call->ackr_window), call->rx_highest_seq,
570+
_enter("{%x,%x,%x},{%u,%x}",
571+
call->ackr_window, call->ackr_wtop, call->rx_highest_seq,
572572
skb->len, seq0);
573573

574574
if (__rxrpc_call_is_complete(call))

net/rxrpc/io_thread.c

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
2525
*/
2626
int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
2727
{
28+
struct sk_buff_head *rx_queue;
2829
struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
2930

3031
if (unlikely(!local)) {
@@ -36,7 +37,16 @@ int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
3637

3738
skb->mark = RXRPC_SKB_MARK_PACKET;
3839
rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
39-
skb_queue_tail(&local->rx_queue, skb);
40+
rx_queue = &local->rx_queue;
41+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
42+
if (rxrpc_inject_rx_delay ||
43+
!skb_queue_empty(&local->rx_delay_queue)) {
44+
skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay);
45+
rx_queue = &local->rx_delay_queue;
46+
}
47+
#endif
48+
49+
skb_queue_tail(rx_queue, skb);
4050
rxrpc_wake_up_io_thread(local);
4151
return 0;
4252
}
@@ -407,6 +417,9 @@ int rxrpc_io_thread(void *data)
407417
struct rxrpc_local *local = data;
408418
struct rxrpc_call *call;
409419
struct sk_buff *skb;
420+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
421+
ktime_t now;
422+
#endif
410423
bool should_stop;
411424

412425
complete(&local->io_thread_ready);
@@ -481,6 +494,17 @@ int rxrpc_io_thread(void *data)
481494
continue;
482495
}
483496

497+
/* Inject a delay into packets if requested. */
498+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
499+
now = ktime_get_real();
500+
while ((skb = skb_peek(&local->rx_delay_queue))) {
501+
if (ktime_before(now, skb->tstamp))
502+
break;
503+
skb = skb_dequeue(&local->rx_delay_queue);
504+
skb_queue_tail(&local->rx_queue, skb);
505+
}
506+
#endif
507+
484508
if (!skb_queue_empty(&local->rx_queue)) {
485509
spin_lock_irq(&local->rx_queue.lock);
486510
skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
@@ -502,6 +526,28 @@ int rxrpc_io_thread(void *data)
502526

503527
if (should_stop)
504528
break;
529+
530+
#ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
531+
skb = skb_peek(&local->rx_delay_queue);
532+
if (skb) {
533+
unsigned long timeout;
534+
ktime_t tstamp = skb->tstamp;
535+
ktime_t now = ktime_get_real();
536+
s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now));
537+
538+
if (delay_ns <= 0) {
539+
__set_current_state(TASK_RUNNING);
540+
continue;
541+
}
542+
543+
timeout = nsecs_to_jiffies(delay_ns);
544+
timeout = max(timeout, 1UL);
545+
schedule_timeout(timeout);
546+
__set_current_state(TASK_RUNNING);
547+
continue;
548+
}
549+
#endif
550+
505551
schedule();
506552
}
507553

0 commit comments

Comments
 (0)