Skip to content

Commit 80127a3

Browse files
Peter ZijlstraIngo Molnar
authored andcommitted
locking/percpu-rwsem: Optimize readers and reduce global impact
Currently the percpu-rwsem switches to (global) atomic ops while a writer is waiting; which could be quite a while and slows down releasing the readers. This patch cures this problem by ordering the reader-state vs reader-count (see the comments in __percpu_down_read() and percpu_down_write()). This changes a global atomic op into a full memory barrier, which doesn't have the global cacheline contention. This also enables using the percpu-rwsem with rcu_sync disabled in order to bias the implementation differently, reducing the writer latency by adding some cost to readers. Signed-off-by: Peter Zijlstra (Intel) <peterz@infradead.org> Reviewed-by: Oleg Nesterov <oleg@redhat.com> Cc: Andrew Morton <akpm@linux-foundation.org> Cc: Linus Torvalds <torvalds@linux-foundation.org> Cc: Paul E. McKenney <paulmck@linux.vnet.ibm.com> Cc: Paul McKenney <paulmck@linux.vnet.ibm.com> Cc: Peter Zijlstra <peterz@infradead.org> Cc: Thomas Gleixner <tglx@linutronix.de> Cc: linux-kernel@vger.kernel.org [ Fixed modular build. ] Signed-off-by: Ingo Molnar <mingo@kernel.org>
1 parent 08be8f6 commit 80127a3

File tree

3 files changed

+208
-106
lines changed

3 files changed

+208
-106
lines changed

include/linux/percpu-rwsem.h

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,30 +10,96 @@
1010

1111
struct percpu_rw_semaphore {
1212
struct rcu_sync rss;
13-
unsigned int __percpu *fast_read_ctr;
13+
unsigned int __percpu *read_count;
1414
struct rw_semaphore rw_sem;
15-
atomic_t slow_read_ctr;
16-
wait_queue_head_t write_waitq;
15+
wait_queue_head_t writer;
16+
int readers_block;
1717
};
1818

19-
extern void percpu_down_read(struct percpu_rw_semaphore *);
20-
extern int percpu_down_read_trylock(struct percpu_rw_semaphore *);
21-
extern void percpu_up_read(struct percpu_rw_semaphore *);
19+
extern int __percpu_down_read(struct percpu_rw_semaphore *, int);
20+
extern void __percpu_up_read(struct percpu_rw_semaphore *);
21+
22+
static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
23+
{
24+
might_sleep();
25+
26+
rwsem_acquire_read(&sem->rw_sem.dep_map, 0, 0, _RET_IP_);
27+
28+
preempt_disable();
29+
/*
30+
* We are in an RCU-sched read-side critical section, so the writer
31+
* cannot both change sem->state from readers_fast and start checking
32+
* counters while we are here. So if we see !sem->state, we know that
33+
* the writer won't be checking until we're past the preempt_enable()
34+
* and that one the synchronize_sched() is done, the writer will see
35+
* anything we did within this RCU-sched read-size critical section.
36+
*/
37+
__this_cpu_inc(*sem->read_count);
38+
if (unlikely(!rcu_sync_is_idle(&sem->rss)))
39+
__percpu_down_read(sem, false); /* Unconditional memory barrier */
40+
preempt_enable();
41+
/*
42+
* The barrier() from preempt_enable() prevents the compiler from
43+
* bleeding the critical section out.
44+
*/
45+
}
46+
47+
static inline int percpu_down_read_trylock(struct percpu_rw_semaphore *sem)
48+
{
49+
int ret = 1;
50+
51+
preempt_disable();
52+
/*
53+
* Same as in percpu_down_read().
54+
*/
55+
__this_cpu_inc(*sem->read_count);
56+
if (unlikely(!rcu_sync_is_idle(&sem->rss)))
57+
ret = __percpu_down_read(sem, true); /* Unconditional memory barrier */
58+
preempt_enable();
59+
/*
60+
* The barrier() from preempt_enable() prevents the compiler from
61+
* bleeding the critical section out.
62+
*/
63+
64+
if (ret)
65+
rwsem_acquire_read(&sem->rw_sem.dep_map, 0, 1, _RET_IP_);
66+
67+
return ret;
68+
}
69+
70+
static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
71+
{
72+
/*
73+
* The barrier() in preempt_disable() prevents the compiler from
74+
* bleeding the critical section out.
75+
*/
76+
preempt_disable();
77+
/*
78+
* Same as in percpu_down_read().
79+
*/
80+
if (likely(rcu_sync_is_idle(&sem->rss)))
81+
__this_cpu_dec(*sem->read_count);
82+
else
83+
__percpu_up_read(sem); /* Unconditional memory barrier */
84+
preempt_enable();
85+
86+
rwsem_release(&sem->rw_sem.dep_map, 1, _RET_IP_);
87+
}
2288

2389
extern void percpu_down_write(struct percpu_rw_semaphore *);
2490
extern void percpu_up_write(struct percpu_rw_semaphore *);
2591

2692
extern int __percpu_init_rwsem(struct percpu_rw_semaphore *,
2793
const char *, struct lock_class_key *);
94+
2895
extern void percpu_free_rwsem(struct percpu_rw_semaphore *);
2996

30-
#define percpu_init_rwsem(brw) \
97+
#define percpu_init_rwsem(sem) \
3198
({ \
3299
static struct lock_class_key rwsem_key; \
33-
__percpu_init_rwsem(brw, #brw, &rwsem_key); \
100+
__percpu_init_rwsem(sem, #sem, &rwsem_key); \
34101
})
35102

36-
37103
#define percpu_rwsem_is_held(sem) lockdep_is_held(&(sem)->rw_sem)
38104

39105
static inline void percpu_rwsem_release(struct percpu_rw_semaphore *sem,

kernel/locking/percpu-rwsem.c

Lines changed: 131 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -8,152 +8,186 @@
88
#include <linux/sched.h>
99
#include <linux/errno.h>
1010

11-
int __percpu_init_rwsem(struct percpu_rw_semaphore *brw,
11+
int __percpu_init_rwsem(struct percpu_rw_semaphore *sem,
1212
const char *name, struct lock_class_key *rwsem_key)
1313
{
14-
brw->fast_read_ctr = alloc_percpu(int);
15-
if (unlikely(!brw->fast_read_ctr))
14+
sem->read_count = alloc_percpu(int);
15+
if (unlikely(!sem->read_count))
1616
return -ENOMEM;
1717

1818
/* ->rw_sem represents the whole percpu_rw_semaphore for lockdep */
19-
__init_rwsem(&brw->rw_sem, name, rwsem_key);
20-
rcu_sync_init(&brw->rss, RCU_SCHED_SYNC);
21-
atomic_set(&brw->slow_read_ctr, 0);
22-
init_waitqueue_head(&brw->write_waitq);
19+
rcu_sync_init(&sem->rss, RCU_SCHED_SYNC);
20+
__init_rwsem(&sem->rw_sem, name, rwsem_key);
21+
init_waitqueue_head(&sem->writer);
22+
sem->readers_block = 0;
2323
return 0;
2424
}
2525
EXPORT_SYMBOL_GPL(__percpu_init_rwsem);
2626

27-
void percpu_free_rwsem(struct percpu_rw_semaphore *brw)
27+
void percpu_free_rwsem(struct percpu_rw_semaphore *sem)
2828
{
2929
/*
3030
* XXX: temporary kludge. The error path in alloc_super()
3131
* assumes that percpu_free_rwsem() is safe after kzalloc().
3232
*/
33-
if (!brw->fast_read_ctr)
33+
if (!sem->read_count)
3434
return;
3535

36-
rcu_sync_dtor(&brw->rss);
37-
free_percpu(brw->fast_read_ctr);
38-
brw->fast_read_ctr = NULL; /* catch use after free bugs */
36+
rcu_sync_dtor(&sem->rss);
37+
free_percpu(sem->read_count);
38+
sem->read_count = NULL; /* catch use after free bugs */
3939
}
4040
EXPORT_SYMBOL_GPL(percpu_free_rwsem);
4141

42-
/*
43-
* This is the fast-path for down_read/up_read. If it succeeds we rely
44-
* on the barriers provided by rcu_sync_enter/exit; see the comments in
45-
* percpu_down_write() and percpu_up_write().
46-
*
47-
* If this helper fails the callers rely on the normal rw_semaphore and
48-
* atomic_dec_and_test(), so in this case we have the necessary barriers.
49-
*/
50-
static bool update_fast_ctr(struct percpu_rw_semaphore *brw, unsigned int val)
42+
int __percpu_down_read(struct percpu_rw_semaphore *sem, int try)
5143
{
52-
bool success;
44+
/*
45+
* Due to having preemption disabled the decrement happens on
46+
* the same CPU as the increment, avoiding the
47+
* increment-on-one-CPU-and-decrement-on-another problem.
48+
*
49+
* If the reader misses the writer's assignment of readers_block, then
50+
* the writer is guaranteed to see the reader's increment.
51+
*
52+
* Conversely, any readers that increment their sem->read_count after
53+
* the writer looks are guaranteed to see the readers_block value,
54+
* which in turn means that they are guaranteed to immediately
55+
* decrement their sem->read_count, so that it doesn't matter that the
56+
* writer missed them.
57+
*/
5358

54-
preempt_disable();
55-
success = rcu_sync_is_idle(&brw->rss);
56-
if (likely(success))
57-
__this_cpu_add(*brw->fast_read_ctr, val);
58-
preempt_enable();
59+
smp_mb(); /* A matches D */
5960

60-
return success;
61-
}
61+
/*
62+
* If !readers_block the critical section starts here, matched by the
63+
* release in percpu_up_write().
64+
*/
65+
if (likely(!smp_load_acquire(&sem->readers_block)))
66+
return 1;
6267

63-
/*
64-
* Like the normal down_read() this is not recursive, the writer can
65-
* come after the first percpu_down_read() and create the deadlock.
66-
*
67-
* Note: returns with lock_is_held(brw->rw_sem) == T for lockdep,
68-
* percpu_up_read() does rwsem_release(). This pairs with the usage
69-
* of ->rw_sem in percpu_down/up_write().
70-
*/
71-
void percpu_down_read(struct percpu_rw_semaphore *brw)
72-
{
73-
might_sleep();
74-
rwsem_acquire_read(&brw->rw_sem.dep_map, 0, 0, _RET_IP_);
68+
/*
69+
* Per the above comment; we still have preemption disabled and
70+
* will thus decrement on the same CPU as we incremented.
71+
*/
72+
__percpu_up_read(sem);
7573

76-
if (likely(update_fast_ctr(brw, +1)))
77-
return;
74+
if (try)
75+
return 0;
7876

79-
/* Avoid rwsem_acquire_read() and rwsem_release() */
80-
__down_read(&brw->rw_sem);
81-
atomic_inc(&brw->slow_read_ctr);
82-
__up_read(&brw->rw_sem);
83-
}
84-
EXPORT_SYMBOL_GPL(percpu_down_read);
77+
/*
78+
* We either call schedule() in the wait, or we'll fall through
79+
* and reschedule on the preempt_enable() in percpu_down_read().
80+
*/
81+
preempt_enable_no_resched();
8582

86-
int percpu_down_read_trylock(struct percpu_rw_semaphore *brw)
87-
{
88-
if (unlikely(!update_fast_ctr(brw, +1))) {
89-
if (!__down_read_trylock(&brw->rw_sem))
90-
return 0;
91-
atomic_inc(&brw->slow_read_ctr);
92-
__up_read(&brw->rw_sem);
93-
}
94-
95-
rwsem_acquire_read(&brw->rw_sem.dep_map, 0, 1, _RET_IP_);
83+
/*
84+
* Avoid lockdep for the down/up_read() we already have them.
85+
*/
86+
__down_read(&sem->rw_sem);
87+
this_cpu_inc(*sem->read_count);
88+
__up_read(&sem->rw_sem);
89+
90+
preempt_disable();
9691
return 1;
9792
}
93+
EXPORT_SYMBOL_GPL(__percpu_down_read);
9894

99-
void percpu_up_read(struct percpu_rw_semaphore *brw)
95+
void __percpu_up_read(struct percpu_rw_semaphore *sem)
10096
{
101-
rwsem_release(&brw->rw_sem.dep_map, 1, _RET_IP_);
102-
103-
if (likely(update_fast_ctr(brw, -1)))
104-
return;
97+
smp_mb(); /* B matches C */
98+
/*
99+
* In other words, if they see our decrement (presumably to aggregate
100+
* zero, as that is the only time it matters) they will also see our
101+
* critical section.
102+
*/
103+
__this_cpu_dec(*sem->read_count);
105104

106-
/* false-positive is possible but harmless */
107-
if (atomic_dec_and_test(&brw->slow_read_ctr))
108-
wake_up_all(&brw->write_waitq);
105+
/* Prod writer to recheck readers_active */
106+
wake_up(&sem->writer);
109107
}
110-
EXPORT_SYMBOL_GPL(percpu_up_read);
108+
EXPORT_SYMBOL_GPL(__percpu_up_read);
109+
110+
#define per_cpu_sum(var) \
111+
({ \
112+
typeof(var) __sum = 0; \
113+
int cpu; \
114+
compiletime_assert_atomic_type(__sum); \
115+
for_each_possible_cpu(cpu) \
116+
__sum += per_cpu(var, cpu); \
117+
__sum; \
118+
})
111119

112-
static int clear_fast_ctr(struct percpu_rw_semaphore *brw)
120+
/*
121+
* Return true if the modular sum of the sem->read_count per-CPU variable is
122+
* zero. If this sum is zero, then it is stable due to the fact that if any
123+
* newly arriving readers increment a given counter, they will immediately
124+
* decrement that same counter.
125+
*/
126+
static bool readers_active_check(struct percpu_rw_semaphore *sem)
113127
{
114-
unsigned int sum = 0;
115-
int cpu;
128+
if (per_cpu_sum(*sem->read_count) != 0)
129+
return false;
130+
131+
/*
132+
* If we observed the decrement; ensure we see the entire critical
133+
* section.
134+
*/
116135

117-
for_each_possible_cpu(cpu) {
118-
sum += per_cpu(*brw->fast_read_ctr, cpu);
119-
per_cpu(*brw->fast_read_ctr, cpu) = 0;
120-
}
136+
smp_mb(); /* C matches B */
121137

122-
return sum;
138+
return true;
123139
}
124140

125-
void percpu_down_write(struct percpu_rw_semaphore *brw)
141+
void percpu_down_write(struct percpu_rw_semaphore *sem)
126142
{
143+
/* Notify readers to take the slow path. */
144+
rcu_sync_enter(&sem->rss);
145+
146+
down_write(&sem->rw_sem);
147+
127148
/*
128-
* Make rcu_sync_is_idle() == F and thus disable the fast-path in
129-
* percpu_down_read() and percpu_up_read(), and wait for gp pass.
130-
*
131-
* The latter synchronises us with the preceding readers which used
132-
* the fast-past, so we can not miss the result of __this_cpu_add()
133-
* or anything else inside their criticial sections.
149+
* Notify new readers to block; up until now, and thus throughout the
150+
* longish rcu_sync_enter() above, new readers could still come in.
134151
*/
135-
rcu_sync_enter(&brw->rss);
152+
WRITE_ONCE(sem->readers_block, 1);
136153

137-
/* exclude other writers, and block the new readers completely */
138-
down_write(&brw->rw_sem);
154+
smp_mb(); /* D matches A */
139155

140-
/* nobody can use fast_read_ctr, move its sum into slow_read_ctr */
141-
atomic_add(clear_fast_ctr(brw), &brw->slow_read_ctr);
156+
/*
157+
* If they don't see our writer of readers_block, then we are
158+
* guaranteed to see their sem->read_count increment, and therefore
159+
* will wait for them.
160+
*/
142161

143-
/* wait for all readers to complete their percpu_up_read() */
144-
wait_event(brw->write_waitq, !atomic_read(&brw->slow_read_ctr));
162+
/* Wait for all now active readers to complete. */
163+
wait_event(sem->writer, readers_active_check(sem));
145164
}
146165
EXPORT_SYMBOL_GPL(percpu_down_write);
147166

148-
void percpu_up_write(struct percpu_rw_semaphore *brw)
167+
void percpu_up_write(struct percpu_rw_semaphore *sem)
149168
{
150-
/* release the lock, but the readers can't use the fast-path */
151-
up_write(&brw->rw_sem);
152169
/*
153-
* Enable the fast-path in percpu_down_read() and percpu_up_read()
154-
* but only after another gp pass; this adds the necessary barrier
155-
* to ensure the reader can't miss the changes done by us.
170+
* Signal the writer is done, no fast path yet.
171+
*
172+
* One reason that we cannot just immediately flip to readers_fast is
173+
* that new readers might fail to see the results of this writer's
174+
* critical section.
175+
*
176+
* Therefore we force it through the slow path which guarantees an
177+
* acquire and thereby guarantees the critical section's consistency.
178+
*/
179+
smp_store_release(&sem->readers_block, 0);
180+
181+
/*
182+
* Release the write lock, this will allow readers back in the game.
183+
*/
184+
up_write(&sem->rw_sem);
185+
186+
/*
187+
* Once this completes (at least one RCU-sched grace period hence) the
188+
* reader fast path will be available again. Safe to use outside the
189+
* exclusive write lock because its counting.
156190
*/
157-
rcu_sync_exit(&brw->rss);
191+
rcu_sync_exit(&sem->rss);
158192
}
159193
EXPORT_SYMBOL_GPL(percpu_up_write);

kernel/rcu/sync.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ void rcu_sync_lockdep_assert(struct rcu_sync *rsp)
6868
RCU_LOCKDEP_WARN(!gp_ops[rsp->gp_type].held(),
6969
"suspicious rcu_sync_is_idle() usage");
7070
}
71+
72+
EXPORT_SYMBOL_GPL(rcu_sync_lockdep_assert);
7173
#endif
7274

7375
/**

0 commit comments

Comments
 (0)