Skip to content

Commit 0f4b1c7

Browse files
Zach BrownAndy Grover
authored andcommitted
rds: fix rds_send_xmit() serialization
rds_send_xmit() was changed to hold an interrupt masking spinlock instead of a mutex so that it could be called from the IB receive tasklet path. This broke the TCP transport because its xmit method can block and masks and unmasks interrupts. This patch serializes callers to rds_send_xmit() with a simple bit instead of the current spinlock or previous mutex. This enables rds_send_xmit() to be called from any context and to call functions which block. Getting rid of the c_send_lock exposes the bare c_lock acquisitions which are changed to block interrupts. A waitqueue is added so that rds_conn_shutdown() can wait for callers to leave rds_send_xmit() before tearing down partial send state. This lets us get rid of c_senders. rds_send_xmit() is changed to check the conn state after acquiring the RDS_IN_XMIT bit to resolve races with the shutdown path. Previously both worked with the conn state and then the lock in the same order, allowing them to race and execute the paths concurrently. rds_send_reset() isn't racing with rds_send_xmit() now that rds_conn_shutdown() properly ensures that rds_send_xmit() can't start once the conn state has been changed. We can remove its previous use of the spinlock. Finally, c_send_generation is redundant. Callers can race to test the c_flags bit by simply retrying instead of racing to test the c_send_generation atomic. Signed-off-by: Zach Brown <zach.brown@oracle.com>
1 parent 501dccc commit 0f4b1c7

File tree

5 files changed

+52
-58
lines changed

5 files changed

+52
-58
lines changed

net/rds/connection.c

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
148148
spin_lock_init(&conn->c_lock);
149149
conn->c_next_tx_seq = 1;
150150

151-
spin_lock_init(&conn->c_send_lock);
152-
atomic_set(&conn->c_send_generation, 1);
153-
atomic_set(&conn->c_senders, 0);
151+
init_waitqueue_head(&conn->c_waitq);
154152
INIT_LIST_HEAD(&conn->c_send_queue);
155153
INIT_LIST_HEAD(&conn->c_retrans);
156154

@@ -275,15 +273,8 @@ void rds_conn_shutdown(struct rds_connection *conn)
275273
}
276274
mutex_unlock(&conn->c_cm_lock);
277275

278-
/* verify everybody's out of rds_send_xmit() */
279-
spin_lock_irq(&conn->c_send_lock);
280-
spin_unlock_irq(&conn->c_send_lock);
281-
282-
while(atomic_read(&conn->c_senders)) {
283-
schedule_timeout(1);
284-
spin_lock_irq(&conn->c_send_lock);
285-
spin_unlock_irq(&conn->c_send_lock);
286-
}
276+
wait_event(conn->c_waitq,
277+
!test_bit(RDS_IN_XMIT, &conn->c_flags));
287278

288279
conn->c_trans->conn_shutdown(conn);
289280
rds_conn_reset(conn);
@@ -477,8 +468,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
477468
sizeof(cinfo->transport));
478469
cinfo->flags = 0;
479470

480-
rds_conn_info_set(cinfo->flags,
481-
spin_is_locked(&conn->c_send_lock), SENDING);
471+
rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
472+
SENDING);
482473
/* XXX Future: return the state rather than these funky bits */
483474
rds_conn_info_set(cinfo->flags,
484475
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,

net/rds/ib_send.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
321321
* credits (see rds_ib_send_add_credits below).
322322
*
323323
* The RDS send code is essentially single-threaded; rds_send_xmit
324-
* grabs c_send_lock to ensure exclusive access to the send ring.
324+
* sets RDS_IN_XMIT to ensure exclusive access to the send ring.
325325
* However, the ACK sending code is independent and can race with
326326
* message SENDs.
327327
*

net/rds/rds.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ enum {
8080
/* Bits for c_flags */
8181
#define RDS_LL_SEND_FULL 0
8282
#define RDS_RECONNECT_PENDING 1
83+
#define RDS_IN_XMIT 2
8384

8485
struct rds_connection {
8586
struct hlist_node c_hash_node;
@@ -91,9 +92,6 @@ struct rds_connection {
9192
struct rds_cong_map *c_lcong;
9293
struct rds_cong_map *c_fcong;
9394

94-
spinlock_t c_send_lock; /* protect send ring */
95-
atomic_t c_send_generation;
96-
atomic_t c_senders;
9795
struct rds_message *c_xmit_rm;
9896
unsigned long c_xmit_sg;
9997
unsigned int c_xmit_hdr_off;
@@ -120,6 +118,7 @@ struct rds_connection {
120118
struct delayed_work c_conn_w;
121119
struct work_struct c_down_w;
122120
struct mutex c_cm_lock; /* protect conn state & cm */
121+
wait_queue_head_t c_waitq;
123122

124123
struct list_head c_map_item;
125124
unsigned long c_map_queued;

net/rds/send.c

Lines changed: 43 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
5353
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
5454

5555
/*
56-
* Reset the send state. Caller must hold c_send_lock when calling here.
56+
* Reset the send state. Callers must ensure that this doesn't race with
57+
* rds_send_xmit().
5758
*/
5859
void rds_send_reset(struct rds_connection *conn)
5960
{
6061
struct rds_message *rm, *tmp;
6162
unsigned long flags;
6263

63-
spin_lock_irqsave(&conn->c_send_lock, flags);
6464
if (conn->c_xmit_rm) {
6565
rm = conn->c_xmit_rm;
6666
conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
6969
* independently) but as the connection is down, there's
7070
* no ongoing RDMA to/from that memory */
7171
rds_message_unmapped(rm);
72-
spin_unlock_irqrestore(&conn->c_send_lock, flags);
73-
7472
rds_message_put(rm);
75-
} else {
76-
spin_unlock_irqrestore(&conn->c_send_lock, flags);
7773
}
7874

7975
conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
9894
spin_unlock_irqrestore(&conn->c_lock, flags);
9995
}
10096

97+
static int acquire_in_xmit(struct rds_connection *conn)
98+
{
99+
return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100+
}
101+
102+
static void release_in_xmit(struct rds_connection *conn)
103+
{
104+
clear_bit(RDS_IN_XMIT, &conn->c_flags);
105+
smp_mb__after_clear_bit();
106+
/*
107+
* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108+
* hot path and finding waiters is very rare. We don't want to walk
109+
* the system-wide hashed waitqueue buckets in the fast path only to
110+
* almost never find waiters.
111+
*/
112+
if (waitqueue_active(&conn->c_waitq))
113+
wake_up_all(&conn->c_waitq);
114+
}
115+
101116
/*
102117
* We're making the concious trade-off here to only send one message
103118
* down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
119134
unsigned int tmp;
120135
struct scatterlist *sg;
121136
int ret = 0;
122-
int gen = 0;
123137
LIST_HEAD(to_be_dropped);
124138

125139
restart:
126-
if (!rds_conn_up(conn))
127-
goto out;
128140

129141
/*
130142
* sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ int rds_send_xmit(struct rds_connection *conn)
133145
* avoids blocking the caller and trading per-connection data between
134146
* caches per message.
135147
*/
136-
if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
148+
if (!acquire_in_xmit(conn)) {
137149
rds_stats_inc(s_send_lock_contention);
138150
ret = -ENOMEM;
139151
goto out;
140152
}
141-
atomic_inc(&conn->c_senders);
153+
154+
/*
155+
* rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156+
* we do the opposite to avoid races.
157+
*/
158+
if (!rds_conn_up(conn)) {
159+
release_in_xmit(conn);
160+
ret = 0;
161+
goto out;
162+
}
142163

143164
if (conn->c_trans->xmit_prepare)
144165
conn->c_trans->xmit_prepare(conn);
145166

146-
gen = atomic_inc_return(&conn->c_send_generation);
147-
148167
/*
149168
* spin trying to push headers and data down the connection until
150169
* the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ int rds_send_xmit(struct rds_connection *conn)
178197
if (!rm) {
179198
unsigned int len;
180199

181-
spin_lock(&conn->c_lock);
200+
spin_lock_irqsave(&conn->c_lock, flags);
182201

183202
if (!list_empty(&conn->c_send_queue)) {
184203
rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ int rds_send_xmit(struct rds_connection *conn)
193212
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
194213
}
195214

196-
spin_unlock(&conn->c_lock);
215+
spin_unlock_irqrestore(&conn->c_lock, flags);
197216

198217
if (!rm)
199218
break;
@@ -207,10 +226,10 @@ int rds_send_xmit(struct rds_connection *conn)
207226
*/
208227
if (rm->rdma.op_active &&
209228
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
210-
spin_lock(&conn->c_lock);
229+
spin_lock_irqsave(&conn->c_lock, flags);
211230
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
212231
list_move(&rm->m_conn_item, &to_be_dropped);
213-
spin_unlock(&conn->c_lock);
232+
spin_unlock_irqrestore(&conn->c_lock, flags);
214233
continue;
215234
}
216235

@@ -336,19 +355,7 @@ int rds_send_xmit(struct rds_connection *conn)
336355
if (conn->c_trans->xmit_complete)
337356
conn->c_trans->xmit_complete(conn);
338357

339-
/*
340-
* We might be racing with another sender who queued a message but
341-
* backed off on noticing that we held the c_send_lock. If we check
342-
* for queued messages after dropping the sem then either we'll
343-
* see the queued message or the queuer will get the sem. If we
344-
* notice the queued message then we trigger an immediate retry.
345-
*
346-
* We need to be careful only to do this when we stopped processing
347-
* the send queue because it was empty. It's the only way we
348-
* stop processing the loop when the transport hasn't taken
349-
* responsibility for forward progress.
350-
*/
351-
spin_unlock_irqrestore(&conn->c_send_lock, flags);
358+
release_in_xmit(conn);
352359

353360
/* Nuke any messages we decided not to retransmit. */
354361
if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ int rds_send_xmit(struct rds_connection *conn)
358365
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
359366
}
360367

361-
atomic_dec(&conn->c_senders);
362-
363368
/*
364-
* Other senders will see we have c_send_lock and exit. We
365-
* need to recheck the send queue and race again for c_send_lock
366-
* to make sure messages don't just sit on the send queue, if
367-
* somebody hasn't already beat us into the loop.
369+
* Other senders can queue a message after we last test the send queue
370+
* but before we clear RDS_IN_XMIT. In that case they'd back off and
371+
* not try and send their newly queued message. We need to check the
372+
* send queue after having cleared RDS_IN_XMIT so that their message
373+
* doesn't get stuck on the send queue.
368374
*
369375
* If the transport cannot continue (i.e ret != 0), then it must
370376
* call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ int rds_send_xmit(struct rds_connection *conn)
374380
smp_mb();
375381
if (!list_empty(&conn->c_send_queue)) {
376382
rds_stats_inc(s_send_lock_queue_raced);
377-
if (gen == atomic_read(&conn->c_send_generation)) {
378-
goto restart;
379-
}
383+
goto restart;
380384
}
381385
}
382386
out:

net/rds/threads.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
*
6262
* Transition to state DISCONNECTING/DOWN:
6363
* - Inside the shutdown worker; synchronizes with xmit path
64-
* through c_send_lock, and with connection management callbacks
64+
* through RDS_IN_XMIT, and with connection management callbacks
6565
* via c_cm_lock.
6666
*
6767
* For receive callbacks, we rely on the underlying transport

0 commit comments

Comments
 (0)