@@ -313,3 +313,221 @@ bool netfs_release_folio(struct folio *folio, gfp_t gfp)
313313 return true;
314314}
315315EXPORT_SYMBOL (netfs_release_folio );
316+
317+ /*
318+ * Wake the collection work item.
319+ */
320+ void netfs_wake_collector (struct netfs_io_request * rreq )
321+ {
322+ if (test_bit (NETFS_RREQ_OFFLOAD_COLLECTION , & rreq -> flags ) &&
323+ !test_bit (NETFS_RREQ_RETRYING , & rreq -> flags )) {
324+ queue_work (system_unbound_wq , & rreq -> work );
325+ } else {
326+ trace_netfs_rreq (rreq , netfs_rreq_trace_wake_queue );
327+ wake_up (& rreq -> waitq );
328+ }
329+ }
330+
331+ /*
332+ * Mark a subrequest as no longer being in progress and, if need be, wake the
333+ * collector.
334+ */
335+ void netfs_subreq_clear_in_progress (struct netfs_io_subrequest * subreq )
336+ {
337+ struct netfs_io_request * rreq = subreq -> rreq ;
338+ struct netfs_io_stream * stream = & rreq -> io_streams [subreq -> stream_nr ];
339+
340+ clear_bit_unlock (NETFS_SREQ_IN_PROGRESS , & subreq -> flags );
341+ smp_mb__after_atomic (); /* Clear IN_PROGRESS before task state */
342+
343+ /* If we are at the head of the queue, wake up the collector. */
344+ if (list_is_first (& subreq -> rreq_link , & stream -> subrequests ) ||
345+ test_bit (NETFS_RREQ_RETRYING , & rreq -> flags ))
346+ netfs_wake_collector (rreq );
347+ }
348+
349+ /*
350+ * Wait for all outstanding I/O in a stream to quiesce.
351+ */
352+ void netfs_wait_for_in_progress_stream (struct netfs_io_request * rreq ,
353+ struct netfs_io_stream * stream )
354+ {
355+ struct netfs_io_subrequest * subreq ;
356+ DEFINE_WAIT (myself );
357+
358+ list_for_each_entry (subreq , & stream -> subrequests , rreq_link ) {
359+ if (!test_bit (NETFS_SREQ_IN_PROGRESS , & subreq -> flags ))
360+ continue ;
361+
362+ trace_netfs_rreq (rreq , netfs_rreq_trace_wait_queue );
363+ for (;;) {
364+ prepare_to_wait (& rreq -> waitq , & myself , TASK_UNINTERRUPTIBLE );
365+
366+ if (!test_bit (NETFS_SREQ_IN_PROGRESS , & subreq -> flags ))
367+ break ;
368+
369+ trace_netfs_sreq (subreq , netfs_sreq_trace_wait_for );
370+ schedule ();
371+ trace_netfs_rreq (rreq , netfs_rreq_trace_woke_queue );
372+ }
373+ }
374+
375+ finish_wait (& rreq -> waitq , & myself );
376+ }
377+
378+ /*
379+ * Perform collection in app thread if not offloaded to workqueue.
380+ */
381+ static int netfs_collect_in_app (struct netfs_io_request * rreq ,
382+ bool (* collector )(struct netfs_io_request * rreq ))
383+ {
384+ bool need_collect = false, inactive = true;
385+
386+ for (int i = 0 ; i < NR_IO_STREAMS ; i ++ ) {
387+ struct netfs_io_subrequest * subreq ;
388+ struct netfs_io_stream * stream = & rreq -> io_streams [i ];
389+
390+ if (!stream -> active )
391+ continue ;
392+ inactive = false;
393+ trace_netfs_collect_stream (rreq , stream );
394+ subreq = list_first_entry_or_null (& stream -> subrequests ,
395+ struct netfs_io_subrequest ,
396+ rreq_link );
397+ if (subreq &&
398+ (!test_bit (NETFS_SREQ_IN_PROGRESS , & subreq -> flags ) ||
399+ test_bit (NETFS_SREQ_MADE_PROGRESS , & subreq -> flags ))) {
400+ need_collect = true;
401+ break ;
402+ }
403+ }
404+
405+ if (!need_collect && !inactive )
406+ return 0 ; /* Sleep */
407+
408+ __set_current_state (TASK_RUNNING );
409+ if (collector (rreq )) {
410+ /* Drop the ref from the NETFS_RREQ_IN_PROGRESS flag. */
411+ netfs_put_request (rreq , netfs_rreq_trace_put_work_ip );
412+ return 1 ; /* Done */
413+ }
414+
415+ if (inactive ) {
416+ WARN (true, "Failed to collect inactive req R=%08x\n" ,
417+ rreq -> debug_id );
418+ cond_resched ();
419+ }
420+ return 2 ; /* Again */
421+ }
422+
423+ /*
424+ * Wait for a request to complete, successfully or otherwise.
425+ */
426+ static ssize_t netfs_wait_for_request (struct netfs_io_request * rreq ,
427+ bool (* collector )(struct netfs_io_request * rreq ))
428+ {
429+ DEFINE_WAIT (myself );
430+ ssize_t ret ;
431+
432+ for (;;) {
433+ trace_netfs_rreq (rreq , netfs_rreq_trace_wait_queue );
434+ prepare_to_wait (& rreq -> waitq , & myself , TASK_UNINTERRUPTIBLE );
435+
436+ if (!test_bit (NETFS_RREQ_OFFLOAD_COLLECTION , & rreq -> flags )) {
437+ switch (netfs_collect_in_app (rreq , collector )) {
438+ case 0 :
439+ break ;
440+ case 1 :
441+ goto all_collected ;
442+ case 2 :
443+ continue ;
444+ }
445+ }
446+
447+ if (!test_bit (NETFS_RREQ_IN_PROGRESS , & rreq -> flags ))
448+ break ;
449+
450+ schedule ();
451+ trace_netfs_rreq (rreq , netfs_rreq_trace_woke_queue );
452+ }
453+
454+ all_collected :
455+ finish_wait (& rreq -> waitq , & myself );
456+
457+ ret = rreq -> error ;
458+ if (ret == 0 ) {
459+ ret = rreq -> transferred ;
460+ switch (rreq -> origin ) {
461+ case NETFS_DIO_READ :
462+ case NETFS_DIO_WRITE :
463+ case NETFS_READ_SINGLE :
464+ case NETFS_UNBUFFERED_WRITE :
465+ break ;
466+ default :
467+ if (rreq -> submitted < rreq -> len ) {
468+ trace_netfs_failure (rreq , NULL , ret , netfs_fail_short_read );
469+ ret = - EIO ;
470+ }
471+ break ;
472+ }
473+ }
474+
475+ return ret ;
476+ }
477+
478+ ssize_t netfs_wait_for_read (struct netfs_io_request * rreq )
479+ {
480+ return netfs_wait_for_request (rreq , netfs_read_collection );
481+ }
482+
483+ ssize_t netfs_wait_for_write (struct netfs_io_request * rreq )
484+ {
485+ return netfs_wait_for_request (rreq , netfs_write_collection );
486+ }
487+
488+ /*
489+ * Wait for a paused operation to unpause or complete in some manner.
490+ */
491+ static void netfs_wait_for_pause (struct netfs_io_request * rreq ,
492+ bool (* collector )(struct netfs_io_request * rreq ))
493+ {
494+ DEFINE_WAIT (myself );
495+
496+ trace_netfs_rreq (rreq , netfs_rreq_trace_wait_pause );
497+
498+ for (;;) {
499+ trace_netfs_rreq (rreq , netfs_rreq_trace_wait_queue );
500+ prepare_to_wait (& rreq -> waitq , & myself , TASK_UNINTERRUPTIBLE );
501+
502+ if (!test_bit (NETFS_RREQ_OFFLOAD_COLLECTION , & rreq -> flags )) {
503+ switch (netfs_collect_in_app (rreq , collector )) {
504+ case 0 :
505+ break ;
506+ case 1 :
507+ goto all_collected ;
508+ case 2 :
509+ continue ;
510+ }
511+ }
512+
513+ if (!test_bit (NETFS_RREQ_IN_PROGRESS , & rreq -> flags ) ||
514+ !test_bit (NETFS_RREQ_PAUSE , & rreq -> flags ))
515+ break ;
516+
517+ schedule ();
518+ trace_netfs_rreq (rreq , netfs_rreq_trace_woke_queue );
519+ }
520+
521+ all_collected :
522+ finish_wait (& rreq -> waitq , & myself );
523+ }
524+
525+ void netfs_wait_for_paused_read (struct netfs_io_request * rreq )
526+ {
527+ return netfs_wait_for_pause (rreq , netfs_read_collection );
528+ }
529+
530+ void netfs_wait_for_paused_write (struct netfs_io_request * rreq )
531+ {
532+ return netfs_wait_for_pause (rreq , netfs_write_collection );
533+ }
0 commit comments