Skip to content

Commit

Permalink
Improved reliability on a couple of flaky tests (segmentio#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve van Loben Sels committed Jun 21, 2019
1 parent 822ed5b commit 8174c4c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
12 changes: 11 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {
const N = 1000
var msgs = make([]string, N)
var done = make(chan struct{})
var written = make(chan struct{}, N/10)

for i := 0; i != N; i++ {
msgs[i] = strconv.Itoa(i)
Expand All @@ -832,12 +833,21 @@ func testConnWriteReadConcurrently(t *testing.T, conn *Conn) {
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error(err)
}
written <- struct{}{}
}
}()

b := make([]byte, 128)

for i := 0; i != N; i++ {
// wait until at least one message has been written. the reason for
// this synchronization is that we aren't using deadlines. as such, if
// the read happens before a message is available, it will cause a
// deadlock because the read request will never hit the one byte minimum
// in order to return and release the lock on the conn. by ensuring
// that there's at least one message produced, we don't hit that
// condition.
<-written
n, err := conn.Read(b)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -880,7 +890,7 @@ func testConnReadEmptyWithDeadline(t *testing.T, conn *Conn) {
b := make([]byte, 100)

start := time.Now()
deadline := start.Add(250 * time.Millisecond)
deadline := start.Add(time.Second)

conn.SetReadDeadline(deadline)
n, err := conn.Read(b)
Expand Down
8 changes: 7 additions & 1 deletion reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func TestReaderConsumerGroup(t *testing.T) {
GroupID: groupID,
HeartbeatInterval: 2 * time.Second,
CommitInterval: test.commitInterval,
RebalanceTimeout: 8 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
MinBytes: 1,
MaxBytes: 1e6,
Expand Down Expand Up @@ -1323,17 +1323,23 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing
// of a minute and that seems too long for unit tests. Also, setting this
// to a larger number seems to make the kafka broker unresponsive.
// TODO research if there's a way to reduce rebalance time across many partitions
// svls: the described behavior is due to the thundering herd of readers
// hitting the rebalance timeout. introducing the 100ms sleep in the
// loop below in order to give time for the sync group to finish has
// greatly helped, though we still hit the timeout from time to time.
const N = 8

var readers []*Reader

for i := 0; i < N-1; i++ {
reader := NewReader(r.config)
readers = append(readers, reader)
time.Sleep(100 * time.Millisecond)
}
defer func() {
for _, r := range readers {
r.Close()
time.Sleep(100 * time.Millisecond)
}
}()

Expand Down

0 comments on commit 8174c4c

Please sign in to comment.