1
- use std:: sync:: atomic:: { AtomicBool , Ordering } ;
1
+ use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
2
2
3
3
use crate :: parallel:: { num_threads, Reduce } ;
4
4
@@ -89,23 +89,25 @@ where
89
89
/// for file-io as it won't make use of sorted inputs well.
90
90
/// Note that `periodic` is not guaranteed to be called in case other threads come up first and finish too fast.
91
91
// TODO: better docs
92
- pub fn in_parallel_with_slice < I , S , E > (
93
- input : & [ I ] ,
92
+ pub fn in_parallel_with_slice < I , S , R , E > (
93
+ input : & mut [ I ] ,
94
94
thread_limit : Option < usize > ,
95
95
new_thread_state : impl FnMut ( usize ) -> S + Send + Clone ,
96
- consume : impl FnMut ( & I , & mut S ) -> Result < ( ) , E > + Send + Clone ,
96
+ consume : impl FnMut ( & mut I , & mut S ) -> Result < ( ) , E > + Send + Clone ,
97
97
mut periodic : impl FnMut ( ) -> Option < std:: time:: Duration > + Send ,
98
- ) -> Result < Vec < S > , E >
98
+ state_to_rval : impl FnOnce ( S ) -> R + Send + Clone ,
99
+ ) -> Result < Vec < R > , E >
99
100
where
100
- I : Send + Sync ,
101
- E : Send + Sync ,
102
- S : Send ,
101
+ I : Send ,
102
+ E : Send ,
103
+ R : Send ,
103
104
{
104
105
let num_threads = num_threads ( thread_limit) ;
105
- let num_items = input. len ( ) ;
106
106
let mut results = Vec :: with_capacity ( num_threads) ;
107
107
let stop_everything = & AtomicBool :: default ( ) ;
108
+ let index = & AtomicUsize :: default ( ) ;
108
109
110
+ // TODO: use std::thread::scope() once Rust 1.63 is available.
109
111
crossbeam_utils:: thread:: scope ( {
110
112
move |s| {
111
113
s. spawn ( {
@@ -124,24 +126,44 @@ where
124
126
}
125
127
} ) ;
126
128
129
+ let input_len = input. len ( ) ;
130
+ struct Input < I > ( * mut [ I ] )
131
+ where
132
+ I : Send ;
133
+
134
+ // SAFETY: I is Send + Sync, so is a *mut [I]
135
+ #[ allow( unsafe_code) ]
136
+ unsafe impl < I > Send for Input < I > where I : Send { }
137
+
127
138
let threads: Vec < _ > = ( 0 ..num_threads)
128
139
. map ( |thread_id| {
129
140
s. spawn ( {
130
141
let mut new_thread_state = new_thread_state. clone ( ) ;
142
+ let state_to_rval = state_to_rval. clone ( ) ;
131
143
let mut consume = consume. clone ( ) ;
144
+ let input = Input ( input as * mut [ I ] ) ;
132
145
move |_| {
133
146
let mut state = new_thread_state ( thread_id) ;
134
- for input_index in ( thread_id..num_items) . step_by ( num_threads) {
147
+ while let Ok ( input_index) = index
148
+ . fetch_update ( Ordering :: SeqCst , Ordering :: SeqCst , |x| ( x < input_len) . then ( || x + 1 ) )
149
+ {
135
150
if stop_everything. load ( Ordering :: Relaxed ) {
136
151
break ;
137
152
}
138
- let item = & input[ input_index] ;
153
+ // SAFETY: our atomic counter for `input_index` is only ever incremented, yielding
154
+ // each item exactly once.
155
+ let item = {
156
+ #[ allow( unsafe_code) ]
157
+ unsafe {
158
+ & mut ( & mut * input. 0 ) [ input_index]
159
+ }
160
+ } ;
139
161
if let Err ( err) = consume ( item, & mut state) {
140
162
stop_everything. store ( true , Ordering :: Relaxed ) ;
141
163
return Err ( err) ;
142
164
}
143
165
}
144
- Ok ( state)
166
+ Ok ( state_to_rval ( state) )
145
167
}
146
168
} )
147
169
} )
0 commit comments