@@ -87,7 +87,6 @@ struct io_wqe {
8787 struct {
8888 raw_spinlock_t lock ;
8989 struct io_wq_work_list work_list ;
90- unsigned long hash_map ;
9190 unsigned flags ;
9291 } ____cacheline_aligned_in_smp ;
9392
@@ -97,6 +96,8 @@ struct io_wqe {
9796 struct hlist_nulls_head free_list ;
9897 struct list_head all_list ;
9998
99+ struct wait_queue_entry wait ;
100+
100101 struct io_wq * wq ;
101102 struct io_wq_work * hash_tail [IO_WQ_NR_HASH_BUCKETS ];
102103};
@@ -113,6 +114,9 @@ struct io_wq {
113114
114115 struct task_struct * manager ;
115116 struct user_struct * user ;
117+
118+ struct io_wq_hash * hash ;
119+
116120 refcount_t refs ;
117121 struct completion done ;
118122
@@ -328,14 +332,31 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
328332 return work -> flags >> IO_WQ_HASH_SHIFT ;
329333}
330334
335+ static void io_wait_on_hash (struct io_wqe * wqe , unsigned int hash )
336+ {
337+ struct io_wq * wq = wqe -> wq ;
338+
339+ spin_lock (& wq -> hash -> wait .lock );
340+ if (list_empty (& wqe -> wait .entry )) {
341+ __add_wait_queue (& wq -> hash -> wait , & wqe -> wait );
342+ if (!test_bit (hash , & wq -> hash -> map )) {
343+ __set_current_state (TASK_RUNNING );
344+ list_del_init (& wqe -> wait .entry );
345+ }
346+ }
347+ spin_unlock (& wq -> hash -> wait .lock );
348+ }
349+
331350static struct io_wq_work * io_get_next_work (struct io_wqe * wqe )
332351 __must_hold (wqe - > lock )
333352{
334353 struct io_wq_work_node * node , * prev ;
335354 struct io_wq_work * work , * tail ;
336- unsigned int hash ;
355+ unsigned int stall_hash = -1U ;
337356
338357 wq_list_for_each (node , prev , & wqe -> work_list ) {
358+ unsigned int hash ;
359+
339360 work = container_of (node , struct io_wq_work , list );
340361
341362 /* not hashed, can run anytime */
@@ -344,16 +365,26 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
344365 return work ;
345366 }
346367
347- /* hashed, can run if not already running */
348368 hash = io_get_work_hash (work );
349- if (!(wqe -> hash_map & BIT (hash ))) {
350- wqe -> hash_map |= BIT (hash );
351- /* all items with this hash lie in [work, tail] */
352- tail = wqe -> hash_tail [hash ];
369+ /* all items with this hash lie in [work, tail] */
370+ tail = wqe -> hash_tail [hash ];
371+
372+ /* hashed, can run if not already running */
373+ if (!test_and_set_bit (hash , & wqe -> wq -> hash -> map )) {
353374 wqe -> hash_tail [hash ] = NULL ;
354375 wq_list_cut (& wqe -> work_list , & tail -> list , prev );
355376 return work ;
356377 }
378+ if (stall_hash == -1U )
379+ stall_hash = hash ;
380+ /* fast forward to a next hash, for-each will fix up @prev */
381+ node = & tail -> list ;
382+ }
383+
384+ if (stall_hash != -1U ) {
385+ raw_spin_unlock (& wqe -> lock );
386+ io_wait_on_hash (wqe , stall_hash );
387+ raw_spin_lock (& wqe -> lock );
357388 }
358389
359390 return NULL ;
@@ -421,6 +452,7 @@ static void io_worker_handle_work(struct io_worker *worker)
421452 if (!work )
422453 break ;
423454 io_assign_current_work (worker , work );
455+ __set_current_state (TASK_RUNNING );
424456
425457 /* handle a whole dependent link */
426458 do {
@@ -444,8 +476,10 @@ static void io_worker_handle_work(struct io_worker *worker)
444476 io_wqe_enqueue (wqe , linked );
445477
446478 if (hash != -1U && !next_hashed ) {
479+ clear_bit (hash , & wq -> hash -> map );
480+ if (wq_has_sleeper (& wq -> hash -> wait ))
481+ wake_up (& wq -> hash -> wait );
447482 raw_spin_lock_irq (& wqe -> lock );
448- wqe -> hash_map &= ~BIT_ULL (hash );
449483 wqe -> flags &= ~IO_WQE_FLAG_STALLED ;
450484 /* skip unnecessary unlock-lock wqe->lock */
451485 if (!work )
@@ -471,7 +505,6 @@ static int io_wqe_worker(void *data)
471505loop :
472506 raw_spin_lock_irq (& wqe -> lock );
473507 if (io_wqe_run_queue (wqe )) {
474- __set_current_state (TASK_RUNNING );
475508 io_worker_handle_work (worker );
476509 goto loop ;
477510 }
@@ -928,6 +961,24 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
928961 return IO_WQ_CANCEL_NOTFOUND ;
929962}
930963
964+ static int io_wqe_hash_wake (struct wait_queue_entry * wait , unsigned mode ,
965+ int sync , void * key )
966+ {
967+ struct io_wqe * wqe = container_of (wait , struct io_wqe , wait );
968+ int ret ;
969+
970+ list_del_init (& wait -> entry );
971+
972+ rcu_read_lock ();
973+ ret = io_wqe_activate_free_worker (wqe );
974+ rcu_read_unlock ();
975+
976+ if (!ret )
977+ wake_up_process (wqe -> wq -> manager );
978+
979+ return 1 ;
980+ }
981+
931982struct io_wq * io_wq_create (unsigned bounded , struct io_wq_data * data )
932983{
933984 int ret = - ENOMEM , node ;
@@ -948,6 +999,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
948999 if (ret )
9491000 goto err_wqes ;
9501001
1002+ refcount_inc (& data -> hash -> refs );
1003+ wq -> hash = data -> hash ;
9511004 wq -> free_work = data -> free_work ;
9521005 wq -> do_work = data -> do_work ;
9531006
@@ -968,6 +1021,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
9681021 wqe -> acct [IO_WQ_ACCT_UNBOUND ].max_workers =
9691022 task_rlimit (current , RLIMIT_NPROC );
9701023 atomic_set (& wqe -> acct [IO_WQ_ACCT_UNBOUND ].nr_running , 0 );
1024+ wqe -> wait .func = io_wqe_hash_wake ;
1025+ INIT_LIST_HEAD (& wqe -> wait .entry );
9711026 wqe -> wq = wq ;
9721027 raw_spin_lock_init (& wqe -> lock );
9731028 INIT_WQ_LIST (& wqe -> work_list );
@@ -989,6 +1044,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
9891044
9901045 if (refcount_dec_and_test (& wq -> refs ))
9911046 complete (& wq -> done );
1047+ io_wq_put_hash (data -> hash );
9921048err :
9931049 cpuhp_state_remove_instance_nocalls (io_wq_online , & wq -> cpuhp_node );
9941050 for_each_node (node )
@@ -1017,8 +1073,15 @@ void io_wq_destroy(struct io_wq *wq)
10171073
10181074 wait_for_completion (& wq -> done );
10191075
1020- for_each_node (node )
1021- kfree (wq -> wqes [node ]);
1076+ spin_lock_irq (& wq -> hash -> wait .lock );
1077+ for_each_node (node ) {
1078+ struct io_wqe * wqe = wq -> wqes [node ];
1079+
1080+ list_del_init (& wqe -> wait .entry );
1081+ kfree (wqe );
1082+ }
1083+ spin_unlock_irq (& wq -> hash -> wait .lock );
1084+ io_wq_put_hash (wq -> hash );
10221085 kfree (wq -> wqes );
10231086 kfree (wq );
10241087}
0 commit comments