diff --git a/crossbeam-channel/tests/golang.rs b/crossbeam-channel/tests/golang.rs index 05d67f683..27f3be0a5 100644 --- a/crossbeam-channel/tests/golang.rs +++ b/crossbeam-channel/tests/golang.rs @@ -15,12 +15,12 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::any::Any; use std::cell::Cell; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender}; +use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) @@ -32,7 +32,13 @@ struct Chan { struct ChanInner { s: Option>, - r: Receiver, + r: Option>, + // Receiver to use when r is None (Go blocks on receiving from nil) + nil_r: Receiver, + // Sender to use when s is None (Go blocks on sending to nil) + nil_s: Sender, + // Hold this receiver to prevent nil sender channel from disconnection + _nil_sr: Receiver, } impl Clone for Chan { @@ -57,35 +63,53 @@ impl Chan { } fn try_recv(&self) -> Option { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.try_recv().ok() } fn recv(&self) -> Option { - let r = self.inner.lock().unwrap().r.clone(); + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); r.recv().ok() } - fn close(&self) { + fn close_s(&self) { self.inner .lock() .unwrap() .s .take() - .expect("channel already closed"); + .expect("channel sender already closed"); + } + + fn close_r(&self) { + self.inner + .lock() + .unwrap() + .r + .take() + .expect("channel receiver already closed"); + } + + fn has_rx(&self) -> bool { + self.inner.lock().unwrap().r.is_some() + } + + fn has_tx(&self) -> bool { + self.inner.lock().unwrap().s.is_some() } fn rx(&self) -> Receiver { - self.inner.lock().unwrap().r.clone() + let inner = self.inner.lock().unwrap(); + match inner.r.as_ref() { + None => inner.nil_r.clone(), + Some(r) => r.clone(), + } } fn tx(&self) -> Sender { - match self.inner.lock().unwrap().s.as_ref() { - None => { - let (s, r) = bounded(0); - std::mem::forget(r); - s - } + let inner = self.inner.lock().unwrap(); + match inner.s.as_ref() { + None => inner.nil_s.clone(), Some(s) => s.clone(), } } @@ -110,17 +134,32 @@ impl<'a, T> IntoIterator for &'a Chan { fn make(cap: usize) -> Chan { let (s, r) = bounded(cap); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } fn make_unbounded() -> Chan { let (s, r) = unbounded(); + let (nil_s, _nil_sr) = bounded(0); Chan { - inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })), + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), } } + #[derive(Clone)] struct WaitGroup(Arc); @@ -240,10 +279,10 @@ mod doubleselect { const ITERATIONS: i32 = 10_000; fn sender(n: i32, c1: Chan, c2: Chan, c3: Chan, c4: Chan) { - defer! { c1.close() } - defer! { c2.close() } - defer! { c3.close() } - defer! { c4.close() } + defer! { c1.close_s() } + defer! { c2.close_s() } + defer! { c3.close_s() } + defer! { c4.close_s() } for i in 0..n { select! { @@ -292,7 +331,7 @@ mod doubleselect { done.recv(); done.recv(); done.recv(); - cmux.close(); + cmux.close_s(); }); recver(cmux); } @@ -999,7 +1038,7 @@ mod chan_test { for i in 0..cap { c.send(i); } - c.close(); + c.close_s(); for i in 0..cap { let v = c.recv(); @@ -1027,7 +1066,7 @@ mod chan_test { }); thread::sleep(ms(1)); - c.close(); + c.close_s(); if !done.recv().unwrap() { panic!(); @@ -1082,7 +1121,7 @@ mod chan_test { } }); - c.close(); + c.close_s(); c.recv(); t.join().unwrap(); } @@ -1149,7 +1188,7 @@ mod chan_test { done.send(true); }); - c2.close(); + c2.close_s(); select! { recv(c1.rx()) -> _ => {} default => {} @@ -1378,7 +1417,7 @@ mod chan_test { ); } - done.close(); + done.close_s(); wg.wait(); } @@ -1481,9 +1520,9 @@ mod chan_test { *expect.lock().unwrap() += v; q.send(v); } - q.close(); + q.close_s(); wg.wait(); - r.close(); + r.close_s(); }); let mut n = 0; @@ -1542,7 +1581,505 @@ mod race_chan_test { // https://github.com/golang/go/blob/master/test/ken/chan.go mod chan { - // TODO + + use super::*; + + #[cfg(not(miri))] + const MESSAGES_PER_CHANEL: u32 = 76; + #[cfg(miri)] + const MESSAGES_PER_CHANEL: u32 = 2; // Miri is too slow on these tests + const MESSAGES_RANGE_LEN: u32 = 100; + const END: i32 = 10000; + + struct ChanWithVals { + chan: Chan, + /// Next value to send + sv: Arc, + /// Next value to receive + rv: Arc, + } + + struct Totals { + /// Total sent messages + tots: u32, + /// Total received messages + totr: u32, + } + + struct Context { + nproc: Arc>, + cval: Arc>, + tot: Arc>, + nc: ChanWithVals, + randx: Arc>, + } + + impl ChanWithVals { + fn with_capacity(capacity: usize) -> Self { + ChanWithVals { + chan: make(capacity), + sv: Arc::new(AtomicI32::new(0)), + rv: Arc::new(AtomicI32::new(0)), + } + } + + fn closed() -> Self { + let ch = ChanWithVals::with_capacity(0); + ch.chan.close_r(); + ch.chan.close_s(); + ch + } + + fn rv(&self) -> i32 { + self.rv.load(SeqCst) + } + + fn sv(&self) -> i32 { + self.sv.load(SeqCst) + } + + fn send(&mut self, tot: &Mutex) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.tots += 1 + } + let esv = expect(self.sv(), self.sv()); + self.sv.store(esv, SeqCst); + if self.sv() == END { + self.chan.close_s(); + return true; + } + false + } + + fn recv(&mut self, v: i32, tot: &Mutex) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.totr += 1 + } + let erv = expect(self.rv(), v); + self.rv.store(erv, SeqCst); + if self.rv() == END { + self.chan.close_r(); + return true; + } + false + } + } + + impl Clone for ChanWithVals { + fn clone(&self) -> Self { + ChanWithVals { + chan: self.chan.clone(), + sv: self.sv.clone(), + rv: self.rv.clone(), + } + } + } + + impl Context { + fn nproc(&self) -> &Mutex { + self.nproc.as_ref() + } + + fn cval(&self) -> &Mutex { + self.cval.as_ref() + } + + fn tot(&self) -> &Mutex { + self.tot.as_ref() + } + + fn randx(&self) -> &Mutex { + self.randx.as_ref() + } + } + + impl Clone for Context { + fn clone(&self) -> Self { + Context { + nproc: self.nproc.clone(), + cval: self.cval.clone(), + tot: self.tot.clone(), + nc: self.nc.clone(), + randx: self.randx.clone(), + } + } + } + + fn nrand(n: i32, randx: &Mutex) -> i32 { + let mut randx = randx.lock().unwrap(); + *randx += 10007; + if *randx >= 1000000 { + *randx -= 1000000 + } + *randx % n + } + + fn change_nproc(adjust: i32, nproc: &Mutex) -> i32 { + let mut nproc = nproc.lock().unwrap(); + *nproc += adjust; + *nproc + } + + fn mkchan(c: usize, n: usize, cval: &Mutex) -> Vec { + let mut ca = Vec::::with_capacity(n); + let mut cval = cval.lock().unwrap(); + for _ in 0..n { + *cval += MESSAGES_RANGE_LEN as i32; + let chl = ChanWithVals::with_capacity(c); + chl.sv.store(*cval, SeqCst); + chl.rv.store(*cval, SeqCst); + ca.push(chl); + } + ca + } + + fn expect(v: i32, v0: i32) -> i32 { + if v == v0 { + return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { + END + } else { + v + 1 + }; + } + panic!("got {}, expected {}", v, v0 + 1); + } + + fn send(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in 0..=nrand(10, ctx.randx()) { + thread::yield_now(); + } + c.chan.tx().send(c.sv()).unwrap(); + if c.send(ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn recv(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in (0..nrand(10, ctx.randx())).rev() { + thread::yield_now(); + } + let v = c.chan.rx().recv().unwrap(); + if c.recv(v, ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + #[allow(clippy::too_many_arguments)] + fn sel( + mut r0: ChanWithVals, + mut r1: ChanWithVals, + mut r2: ChanWithVals, + mut r3: ChanWithVals, + mut s0: ChanWithVals, + mut s1: ChanWithVals, + mut s2: ChanWithVals, + mut s3: ChanWithVals, + ctx: Context, + ) { + let mut a = 0; // local chans running + + if r0.chan.has_rx() { + a += 1; + } + if r1.chan.has_rx() { + a += 1; + } + if r2.chan.has_rx() { + a += 1; + } + if r3.chan.has_rx() { + a += 1; + } + if s0.chan.has_tx() { + a += 1; + } + if s1.chan.has_tx() { + a += 1; + } + if s2.chan.has_tx() { + a += 1; + } + if s3.chan.has_tx() { + a += 1; + } + + loop { + for _ in 0..=nrand(5, ctx.randx()) { + thread::yield_now(); + } + select! { + recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, + send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, + send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, + send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, + } + if a == 0 { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { + vec.get(idx).unwrap().clone() + } + + /// Direct send to direct recv + fn test1(c: ChanWithVals, ctx: &mut Context) { + change_nproc(2, ctx.nproc()); + go!(c, ctx, send(c, ctx)); + go!(c, ctx, recv(c, ctx)); + } + + /// Direct send to select recv + fn test2(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 0), ctx)); + go!(ca, ctx, send(get(&ca, 1), ctx)); + go!(ca, ctx, send(get(&ca, 2), ctx)); + go!(ca, ctx, send(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to direct recv + fn test3(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 0), ctx)); + go!(ca, ctx, recv(get(&ca, 1), ctx)); + go!(ca, ctx, recv(get(&ca, 2), ctx)); + go!(ca, ctx, recv(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + } + + /// Select send to select recv, 4 channels + fn test4(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to select recv, 8 channels + fn test5(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 8, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + ctx, + ) + ); + } + + // Direct and select send to direct and select recv + fn test6(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 12, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 4), ctx)); + go!(ca, ctx, send(get(&ca, 5), ctx)); + go!(ca, ctx, send(get(&ca, 6), ctx)); + go!(ca, ctx, send(get(&ca, 7), ctx)); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 8), ctx)); + go!(ca, ctx, recv(get(&ca, 9), ctx)); + go!(ca, ctx, recv(get(&ca, 10), ctx)); + go!(ca, ctx, recv(get(&ca, 11), ctx)); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 8), + get(&ca, 9), + get(&ca, 10), + get(&ca, 11), + ctx, + ) + ); + } + + fn wait(ctx: &mut Context) { + thread::yield_now(); + while change_nproc(0, ctx.nproc()) != 0 { + thread::yield_now(); + } + } + + fn tests(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + test1(get(&ca, 0), ctx); + test1(get(&ca, 1), ctx); + test1(get(&ca, 2), ctx); + test1(get(&ca, 3), ctx); + wait(ctx); + + test2(c, ctx); + wait(ctx); + + test3(c, ctx); + wait(ctx); + + test4(c, ctx); + wait(ctx); + + test5(c, ctx); + wait(ctx); + + test6(c, ctx); + wait(ctx); + } + + #[test] + fn main() { + let mut ctx = Context { + nproc: Arc::new(Mutex::new(0)), + cval: Arc::new(Mutex::new(0)), + tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), + nc: ChanWithVals::closed(), + randx: Arc::new(Mutex::new(0)), + }; + + tests(0, &mut ctx); + tests(1, &mut ctx); + tests(10, &mut ctx); + tests(100, &mut ctx); + + #[rustfmt::skip] + let t = 4 * // buffer sizes + (4*4 + // tests 1,2,3,4 channels + 8 + // test 5 channels + 12) * // test 6 channels + MESSAGES_PER_CHANEL; // sends/recvs on a channel + + let tot = ctx.tot.lock().unwrap(); + if tot.tots != t || tot.totr != t { + panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); + } + } } // https://github.com/golang/go/blob/master/test/ken/chan1.go