Skip to content
Permalink
Browse files

Fix toxic add/remove races

Change io_chan EOF behaviour to fix tests
  • Loading branch information
xthexder committed Nov 15, 2015
1 parent 5608127 commit 9591d036309767756543ea88ffd5db2f0873be6c
Showing with 26 additions and 17 deletions.
  1. +16 −9 link.go
  2. +4 −1 stream/io_chan.go
  3. +5 −6 stream/io_chan_test.go
  4. +1 −1 toxics/toxic.go
25 link.go
@@ -87,16 +87,21 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)

// Add a toxic to the end of the chain.
func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := toxic.Index
i := len(link.stubs)

newin := make(chan *stream.StreamChunk, toxic.BufferSize)
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output))

// Interrupt the last toxic so that we don't have a race when moving channels
if link.stubs[i-1].InterruptToxic() {
newin := make(chan *stream.StreamChunk, toxic.BufferSize)
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output))
link.stubs[i-1].Output = newin

go link.stubs[i].Run(toxic)
go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
} else {
// This link is already closed, make sure the new toxic matches
link.stubs[i].Output = newin // The real output is already closed, close this instead
link.stubs[i].Close()
}
}

@@ -119,36 +124,38 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
}()

// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
interrupted := false
stopped := false
for !interrupted {
select {
case interrupted = <-stop:
if !interrupted {
return
}
stopped = true
case tmp := <-link.stubs[i].Input:
link.stubs[i].Output <- tmp
if tmp == nil {
link.stubs[i].Close()
if !stopped {
<-stop
}
return
}
link.stubs[i].Output <- tmp
}
}

// Empty the toxic's buffer if necessary
for len(link.stubs[i].Input) > 0 {
tmp := <-link.stubs[i].Input
link.stubs[i].Output <- tmp
if tmp == nil {
link.stubs[i].Close()
return
}
link.stubs[i].Output <- tmp
}

link.stubs[i-1].Output = link.stubs[i].Output
link.stubs = append(link.stubs[:i], link.stubs[i+1:]...)

go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
}

}
@@ -79,7 +79,10 @@ func (c *ChanReader) Read(out []byte) (int, error) {
case p := <-c.input:
if p == nil { // Stream was closed
c.buffer = nil
return n, io.EOF
if n > 0 {
return n, nil
}
return 0, io.EOF
}
n2 := copy(out[n:], p.Data)
c.buffer = p.Data[n2:]
@@ -83,8 +83,8 @@ func TestReadLessThanWrite(t *testing.T) {
if n != len(send)-len(buf) {
t.Fatalf("Read wrong number of bytes: %d expected %d", n, len(send)-len(buf))
}
if err != io.EOF {
t.Fatal("Read returned wrong error after close:", err)
if err != nil {
t.Fatal("Couldn't read from stream", err)
}
if !bytes.Equal(buf[:n], send[len(buf):]) {
t.Fatal("Got wrong message from stream", string(buf[:n]))
@@ -110,11 +110,10 @@ func TestMultiReadWrite(t *testing.T) {
writer.Close()
}()
buf := make([]byte, 10)
read := 0
for i := 0; i < len(send)/10; i++ {
for read := 0; read < len(send); {
n, err := reader.Read(buf)
if err != nil {
t.Fatal("Couldn't read from stream", err)
t.Fatal("Couldn't read from stream", err, n)
}
if !bytes.Equal(buf[:n], send[read:read+n]) {
t.Fatal("Got wrong message from stream", string(buf))
@@ -123,7 +122,7 @@ func TestMultiReadWrite(t *testing.T) {
}
n, err := reader.Read(buf)
if err != io.EOF {
t.Fatal("Read returned wrong error after close:", err)
t.Fatal("Read returned wrong error after close:", err, string(buf[:n]))
}
if !bytes.Equal(buf[:n], send[len(send)-n:]) {
t.Fatal("Got wrong message from stream", string(buf[:n]))
@@ -92,7 +92,7 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) {
}

// Interrupt the flow of data so that the toxic controlling the stub can be replaced.
// Returns true if the stream was successfully interrupted.
// Returns true if the stream was successfully interrupted, or false if the stream is closed.
func (s *ToxicStub) InterruptToxic() bool {
select {
case <-s.closed:

0 comments on commit 9591d03

Please sign in to comment.
You can’t perform that action at this time.