Skip to content

Commit

Permalink
timeout toxic cleans up after itself
Browse files Browse the repository at this point in the history
  • Loading branch information
jpittis committed Mar 25, 2017
1 parent 6b464fc commit f7154cc
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 47 deletions.
9 changes: 9 additions & 0 deletions link.go
Expand Up @@ -123,6 +123,15 @@ func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
i := toxic.Index

if link.stubs[i].InterruptToxic() {
cleanup, ok := toxic.Toxic.(toxics.CleanupToxic)
if ok {
cleanup.Cleanup(link.stubs[i])
// Cleanup could have closed the stub.
if link.stubs[i].Closed() {
return
}
}

stop := make(chan bool)
// Interrupt the previous toxic to update its output
go func() {
Expand Down
24 changes: 17 additions & 7 deletions link_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"testing"
"time"

"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
Expand Down Expand Up @@ -191,13 +192,22 @@ func TestToxicity(t *testing.T) {
collection.chainUpdateToxic(toxic)

// Toxic should timeout after 100ms
n, err = link.input.Write([]byte{42})
if n != 1 || err != nil {
t.Fatalf("Write failed: %d %v", n, err)
}
n, err = link.output.Read(buf)
if n != 0 || err != io.EOF {
t.Fatalf("Read did not get EOF: %d %v", n, err)
done := make(chan struct{})
go func() {
n, err = link.input.Write([]byte{42})
if n != 1 || err != nil {
t.Fatalf("Write failed: %d %v", n, err)
}
n, err = link.output.Read(buf)
if n != 0 || err != io.EOF {
t.Fatalf("Read did not get EOF: %d %v", n, err)
}
close(done)
}()
select {
case <-done:
case <-time.After(150 * time.Millisecond):
t.Fatalf("Expected timeout after 100ms")
}
}

Expand Down
14 changes: 7 additions & 7 deletions toxic_collection.go
Expand Up @@ -19,10 +19,10 @@ import (
type ToxicCollection struct {
sync.Mutex

noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
links map[string]*ToxicLink
noop *toxics.ToxicWrapper
proxy *Proxy
chain [][]*toxics.ToxicWrapper
links map[string]*ToxicLink
}

func NewToxicCollection(proxy *Proxy) *ToxicCollection {
Expand All @@ -31,9 +31,9 @@ func NewToxicCollection(proxy *Proxy) *ToxicCollection {
Toxic: new(toxics.NoopToxic),
Type: "noop",
},
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
links: make(map[string]*ToxicLink),
proxy: proxy,
chain: make([][]*toxics.ToxicWrapper, stream.NumDirections),
links: make(map[string]*ToxicLink),
}
for dir := range collection.chain {
collection.chain[dir] = make([]*toxics.ToxicWrapper, 1, toxics.Count()+1)
Expand Down
24 changes: 18 additions & 6 deletions toxics/timeout.go
Expand Up @@ -12,19 +12,31 @@ type TimeoutToxic struct {
func (t *TimeoutToxic) Pipe(stub *ToxicStub) {
timeout := time.Duration(t.Timeout) * time.Millisecond
if timeout > 0 {
for {
select {
case <-time.After(timeout):
stub.Close()
return
case <-stub.Interrupt:
return
case <-stub.Input:
// Drop the data on the ground.
}
}
} else {
select {
case <-time.After(timeout):
stub.Close()
return
case <-stub.Interrupt:
return
case <-stub.Input:
// Drop the data on the ground.
}
} else {
<-stub.Interrupt
return
}
}

func (t *TimeoutToxic) Cleanup(stub *ToxicStub) {
stub.Close()
}

func init() {
Register("timeout", new(TimeoutToxic))
}
106 changes: 81 additions & 25 deletions toxics/timeout_test.go
@@ -1,9 +1,10 @@
package toxics_test

import (
"testing"
"net"
"time"
"io"
"net"
"testing"
"time"

"github.com/Shopify/toxiproxy/toxics"
)
Expand All @@ -19,8 +20,8 @@ func TestTimeoutToxicDoesNotCauseHang(t *testing.T) {
proxy.Start()
defer proxy.Stop()

proxy.Toxics.AddToxicJson(ToxicToJson(t, "might_block", "latency", "upstream", &toxics.LatencyToxic{Latency: 10}))
proxy.Toxics.AddToxicJson(ToxicToJson(t, "to_delete", "timeout", "upstream", &toxics.TimeoutToxic{Timeout: 0}))
proxy.Toxics.AddToxicJson(ToxicToJson(t, "might_block", "latency", "upstream", &toxics.LatencyToxic{Latency: 10}))
proxy.Toxics.AddToxicJson(ToxicToJson(t, "to_delete", "timeout", "upstream", &toxics.TimeoutToxic{Timeout: 0}))

serverConnRecv := make(chan net.Conn)
go func() {
Expand All @@ -31,31 +32,86 @@ func TestTimeoutToxicDoesNotCauseHang(t *testing.T) {
serverConnRecv <- conn
}()

conn, err := net.Dial("tcp", proxy.Listen)
if err != nil {
t.Fatal("Unable to dial TCP server", err)
}
defer conn.Close()
conn, err := net.Dial("tcp", proxy.Listen)
if err != nil {
t.Fatal("Unable to dial TCP server", err)
}
defer conn.Close()

_ = <-serverConnRecv

_, err = conn.Write([]byte("hello"))
if err != nil {
t.Fatal("Unable to write to proxy", err)
}
_, err = conn.Write([]byte("hello"))
if err != nil {
t.Fatal("Unable to write to proxy", err)
}

time.Sleep(1 * time.Second) // Shitty sync waiting for latency toxi to get data.
time.Sleep(1 * time.Second) // Shitty sync waiting for latency toxi to get data.

done := make(chan struct{})
go func() {
proxy.Toxics.RemoveToxic("might_block")
close(done)
}()
done := make(chan struct{})
go func() {
proxy.Toxics.RemoveToxic("might_block")
close(done)
}()

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout toxic is causing latency toxic to block")
}
select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout toxic is causing latency toxic to block")
}
}

func TestTimeoutToxicClosesConnectionOnRemove(t *testing.T) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal("Failed to create TCP server", err)
}
defer ln.Close()

proxy := NewTestProxy("test", ln.Addr().String())
proxy.Start()
defer proxy.Stop()

serverConnRecv := make(chan net.Conn)
go func() {
conn, err := ln.Accept()
if err != nil {
t.Error("Unable to accept TCP connection", err)
}
serverConnRecv <- conn
}()

conn, err := net.Dial("tcp", proxy.Listen)
if err != nil {
t.Fatal("Unable to dial TCP server", err)
}
defer conn.Close()

serverConn := <-serverConnRecv
defer serverConn.Close()

// Send data on connection to confirm link is established.
conn.Write([]byte("foobar"))
buf := make([]byte, 6)
serverConn.Read(buf)

proxy.Toxics.AddToxicJson(ToxicToJson(t, "to_delete", "timeout", "upstream", &toxics.TimeoutToxic{Timeout: 0}))

proxy.Toxics.RemoveToxic("to_delete")

closed := make(chan error)

go func() {
buf = make([]byte, 1)
_, err = conn.Read(buf)
closed <- err
}()

select {
case err := <-closed:
if err != io.EOF {
t.Fatal("expected EOF from closed connetion")
}
case <-time.After(1 * time.Second):
t.Fatal("connection was not closed in time")
}
}
20 changes: 18 additions & 2 deletions toxics/toxic.go
Expand Up @@ -26,6 +26,11 @@ type Toxic interface {
Pipe(*ToxicStub)
}

type CleanupToxic interface {
// Cleanup is called before a toxic is removed.
Cleanup(*ToxicStub)
}

type BufferedToxic interface {
// Defines the size of buffer this toxic should use
GetBufferSize() int
Expand Down Expand Up @@ -92,9 +97,20 @@ func (s *ToxicStub) InterruptToxic() bool {
}
}

func (s *ToxicStub) Closed() bool {
select {
case <-s.closed:
return true
default:
return false
}
}

func (s *ToxicStub) Close() {
close(s.closed)
close(s.Output)
if !s.Closed() {
close(s.closed)
close(s.Output)
}
}

var ToxicRegistry map[string]Toxic
Expand Down

0 comments on commit f7154cc

Please sign in to comment.