diff --git a/changes.go b/changes.go index a0cfb75..c8ff6db 100644 --- a/changes.go +++ b/changes.go @@ -63,6 +63,9 @@ func i64defopt(opts map[string]interface{}, k string, def int64) int64 { return rv } +var changesDialer = net.Dial +var changesFailDelay = time.Second + // Feed the changes. // // The handler receives the body of the stream and is expected to consume @@ -104,7 +107,7 @@ func (p Database) Changes(handler ChangeHandler, Proxy: http.ProxyFromEnvironment, Dial: func(n, addr string) (net.Conn, error) { var err error - conn, err = net.Dial(n, addr) + conn, err = changesDialer(n, addr) return conn, err }, }} @@ -120,7 +123,7 @@ func (p Database) Changes(handler ChangeHandler, }() } else { log.Printf("Error in stream: %v", err) - time.Sleep(time.Second * 1) + time.Sleep(changesFailDelay) } } return nil diff --git a/changes_test.go b/changes_test.go index d413f61..b8b2f8b 100644 --- a/changes_test.go +++ b/changes_test.go @@ -2,6 +2,7 @@ package couch import ( "io" + "net" "testing" "time" ) @@ -78,3 +79,109 @@ func TestI64Opt(t *testing.T) { } } } + +type mockConn struct { + stuff []byte + respEnabled bool + fail bool +} + +func (m *mockConn) Read(b []byte) (int, error) { + if m.fail { + m.fail = false + return 0, io.EOF + } + if !m.respEnabled { + return 0, nil + } + if len(m.stuff) == 0 { + return 0, io.EOF + } + n := copy(b, m.stuff) + m.stuff = m.stuff[n:] + return n, nil +} + +func (m *mockConn) Write(b []byte) (n int, err error) { + m.respEnabled = true + return len(b), err +} + +func (m *mockConn) Close() error { + return nil +} + +func (m mockConn) LocalAddr() net.Addr { + return nil +} + +func (m mockConn) RemoteAddr() net.Addr { + return nil +} + +func (m mockConn) SetDeadline(t time.Time) error { + return nil +} + +func (m mockConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (m mockConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func mockDialer(m *mockConn) func(string, string) (net.Conn, error) { + return func(string, string) (net.Conn, error) { + return m, nil + } +} + +func setChangesDialer(to func(string, string) (net.Conn, error)) func(string, string) (net.Conn, error) { + prev := changesDialer + changesDialer = to + return prev +} + +func TestChangesTwice(t *testing.T) { + mock := &mockConn{[]byte(`HTTP/1.0 200 OK + +`), false, true} + changesFailDelay = 5 + defer setChangesDialer(setChangesDialer(mockDialer(mock))) + d := Database{} + err := d.Changes(func(io.Reader) int64 { return -1 }, map[string]interface{}{}) + t.Logf("Error: %v", err) +} + +func TestChangesWithOptions(t *testing.T) { + mock := &mockConn{[]byte(`HTTP/1.0 200 OK + +`), false, false} + changesFailDelay = 5 + defer setChangesDialer(setChangesDialer(mockDialer(mock))) + d := Database{} + err := d.Changes(func(io.Reader) int64 { return -1 }, + map[string]interface{}{ + "since": 858245, + "start_key": "x", + "heartbeat": 3999, + }) + t.Logf("Error: %v", err) +} + +func TestChangesWithNegativeHB(t *testing.T) { + mock := &mockConn{[]byte(`HTTP/1.0 200 OK + +`), false, false} + changesFailDelay = 5 + defer setChangesDialer(setChangesDialer(mockDialer(mock))) + d := Database{} + err := d.Changes(func(io.Reader) int64 { return -1 }, + map[string]interface{}{ + "since": 858245, + "start_key": "x", + "heartbeat": -3999, + }) + t.Logf("Error: %v", err) +}