Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acceptance tests: prevent deadlocks in tests for async writes #27

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func (d ConfigurableAcceptanceTestDriver) writeAsync(ctx context.Context, dest D
return err
}

// TODO create timeout for wait to prevent deadlock for badly written connectors
waitForAck.Wait()
// wait for each of the records, for at most the specified write timeout
waitTimeout(&waitForAck, time.Duration(len(records))*d.WriteTimeout())
if ackErr != nil {
return ackErr
}
Expand Down Expand Up @@ -938,8 +938,8 @@ func (a acceptanceTest) TestDestination_WriteAsync_Success(t *testing.T) {
is.NoErr(err)

// wait for acks to get called
// TODO timeout if it takes too long
ackWg.Wait()
// wait for each of the records, for at most the specified write timeout
waitTimeout(&ackWg, time.Duration(20)*a.driver.WriteTimeout())

got := a.driver.ReadFromDestination(t, want)
a.isEqualRecords(is, want, got)
Expand Down Expand Up @@ -1129,3 +1129,17 @@ func (a acceptanceTest) isEqualData(is *is.I, want, got Data) {
is.Equal(want.Bytes(), got.Bytes()) // data did not match (want != got)
}
}

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if it would be worth it to add this function as a method to acceptanceTest and to ConfigurableAcceptanceTestDriver 🤔 it would mean it's duplicated, but on the other hand we prevent it from being used anywhere else and keep the package clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that too, but I didn't like the duplication at all. On the other hand, maybe it's not a bad thing at all that it's accessible to the rest of the package. I'd maybe move it to a utils package or import it from a library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovro and I talked about this "offline". Similar functionality is needed in other places, so we'll extract this function into a separate file.

c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}