Skip to content

Commit

Permalink
internal/poll, os: cancel pending I/O when closing pipes on Windows
Browse files Browse the repository at this point in the history
When closing a pipe, use CancelIoEx to cancel pending I/O.
This makes concurrent Read and Write calls to return os.ErrClosed.

This change also enables some pipe tests on Windows.

Fixes golang#28477, golang#25835

Change-Id: If52bb7d80895763488a61632e4682a78336e8420
  • Loading branch information
crvv committed Mar 4, 2019
1 parent d188767 commit fce1ed0
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 223 deletions.
4 changes: 4 additions & 0 deletions src/internal/poll/fd_mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ func (mu *fdMutex) decref() bool {
}
}

func (mu *fdMutex) closed() bool {
return atomic.LoadUint64(&mu.state)&mutexClosed != 0
}

// lock adds a reference to mu and locks mu.
// It reports whether mu is available for reading or writing.
func (mu *fdMutex) rwlock(read bool) bool {
Expand Down
13 changes: 13 additions & 0 deletions src/internal/poll/fd_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}
if fd.isFile {
if t, err := syscall.GetFileType(fd.Sysfd); err == nil && t == syscall.FILE_TYPE_PIPE {
syscall.CancelIoEx(fd.Sysfd, nil)
}
}
// unblock pending reader and writer
fd.pd.evict()
err := fd.decref()
Expand Down Expand Up @@ -492,6 +497,10 @@ func (fd *FD) Read(buf []byte) (int, error) {
n, err = fd.readConsole(buf)
} else {
n, err = syscall.Read(fd.Sysfd, buf)
if err == syscall.ERROR_OPERATION_ABORTED && fd.fdmu.closed() {
// The Read is cancelled in Close
err = ErrFileClosing
}
}
if err != nil {
n = 0
Expand Down Expand Up @@ -676,6 +685,10 @@ func (fd *FD) Write(buf []byte) (int, error) {
n, err = fd.writeConsole(b)
} else {
n, err = syscall.Write(fd.Sysfd, b)
if err == syscall.ERROR_OPERATION_ABORTED && fd.fdmu.closed() {
// the Write is cancelled in Close
err = ErrFileClosing
}
}
if err != nil {
n = 0
Expand Down
232 changes: 9 additions & 223 deletions src/os/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,125 +2,25 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Test broken pipes on Unix systems.
// +build !windows,!plan9,!nacl,!js
// +build !plan9,!nacl,!js

package os_test

import (
"bufio"
"bytes"
"fmt"
"internal/testenv"
"io"
"io/ioutil"
"os"
osexec "os/exec"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"testing"
"time"
)

func TestEPIPE(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
if err := r.Close(); err != nil {
t.Fatal(err)
}

// Every time we write to the pipe we should get an EPIPE.
for i := 0; i < 20; i++ {
_, err = w.Write([]byte("hi"))
if err == nil {
t.Fatal("unexpected success of Write to broken pipe")
}
if pe, ok := err.(*os.PathError); ok {
err = pe.Err
}
if se, ok := err.(*os.SyscallError); ok {
err = se.Err
}
if err != syscall.EPIPE {
t.Errorf("iteration %d: got %v, expected EPIPE", i, err)
}
}
}

func TestStdPipe(t *testing.T) {
testenv.MustHaveExec(t)
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
if err := r.Close(); err != nil {
t.Fatal(err)
}
// Invoke the test program to run the test and write to a closed pipe.
// If sig is false:
// writing to stdout or stderr should cause an immediate SIGPIPE;
// writing to descriptor 3 should fail with EPIPE and then exit 0.
// If sig is true:
// all writes should fail with EPIPE and then exit 0.
for _, sig := range []bool{false, true} {
for dest := 1; dest < 4; dest++ {
cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
cmd.Stdout = w
cmd.Stderr = w
cmd.ExtraFiles = []*os.File{w}
cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
if sig {
cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
}
if err := cmd.Run(); err == nil {
if !sig && dest < 3 {
t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
}
} else if ee, ok := err.(*osexec.ExitError); !ok {
t.Errorf("unexpected exec error type %T: %v", err, err)
} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
} else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
if sig || dest > 2 {
t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
}
} else {
t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
}
}
}
}

// This is a helper for TestStdPipe. It's not a test in itself.
func TestStdPipeHelper(t *testing.T) {
if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
}
switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
case "1":
os.Stdout.Write([]byte("stdout"))
case "2":
os.Stderr.Write([]byte("stderr"))
case "3":
if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
os.Exit(3)
}
default:
t.Skip("skipping test helper")
}
// For stdout/stderr, we should have crashed with a broken pipe error.
// The caller will be looking for that exit status,
// so just exit normally here to cause a failure in the caller.
// For descriptor 3, a normal exit is expected.
os.Exit(0)
}

func testClosedPipeRace(t *testing.T, read bool) {
switch runtime.GOOS {
case "freebsd":
Expand Down Expand Up @@ -191,60 +91,6 @@ func TestClosedPipeRaceWrite(t *testing.T) {
testClosedPipeRace(t, false)
}

// Issue 20915: Reading on nonblocking fd should not return "waiting
// for unsupported file type." Currently it returns EAGAIN; it is
// possible that in the future it will simply wait for data.
func TestReadNonblockingFd(t *testing.T) {
if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
fd := int(os.Stdin.Fd())
syscall.SetNonblock(fd, true)
defer syscall.SetNonblock(fd, false)
_, err := os.Stdin.Read(make([]byte, 1))
if err != nil {
if perr, ok := err.(*os.PathError); !ok || perr.Err != syscall.EAGAIN {
t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
}
}
os.Exit(0)
}

testenv.MustHaveExec(t)
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()
cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
cmd.Stdin = r
output, err := cmd.CombinedOutput()
t.Logf("%s", output)
if err != nil {
t.Errorf("child process failed: %v", err)
}
}

func TestCloseWithBlockingReadByNewFile(t *testing.T) {
var p [2]int
err := syscall.Pipe(p[:])
if err != nil {
t.Fatal(err)
}
// os.NewFile returns a blocking mode file.
testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
}

func TestCloseWithBlockingReadByFd(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
// Calling Fd will put the file into blocking mode.
_ = r.Fd()
testCloseWithBlockingRead(t, r, w)
}

// Test that we don't let a blocking read prevent a close.
func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
defer r.Close()
Expand Down Expand Up @@ -276,9 +122,6 @@ func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
if err == nil {
t.Error("I/O on closed pipe unexpectedly succeeded")
}
if err != io.EOF {
t.Errorf("got %v, expected io.EOF", err)
}
}(c2)

for c1 != nil || c2 != nil {
Expand Down Expand Up @@ -308,6 +151,14 @@ func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
wg.Wait()
}

func TestCloseWithBlockingRead(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
testCloseWithBlockingRead(t, r, w)
}

// Issue 24164, for pipes.
func TestPipeEOF(t *testing.T) {
r, w, err := os.Pipe()
Expand Down Expand Up @@ -372,68 +223,3 @@ func TestPipeEOF(t *testing.T) {
r.Close()
}
}

// Issue 24481.
func TestFdRace(t *testing.T) {
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()

var wg sync.WaitGroup
call := func() {
defer wg.Done()
w.Fd()
}

const tries = 100
for i := 0; i < tries; i++ {
wg.Add(1)
go call()
}
wg.Wait()
}

func TestFdReadRace(t *testing.T) {
t.Parallel()

r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer r.Close()
defer w.Close()

c := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var buf [10]byte
r.SetReadDeadline(time.Now().Add(time.Second))
c <- true
if _, err := r.Read(buf[:]); os.IsTimeout(err) {
t.Error("read timed out")
}
}()

wg.Add(1)
go func() {
defer wg.Done()
<-c
// Give the other goroutine a chance to enter the Read.
// It doesn't matter if this occasionally fails, the test
// will still pass, it just won't test anything.
time.Sleep(10 * time.Millisecond)
r.Fd()

// The bug was that Fd would hang until Read timed out.
// If the bug is fixed, then closing r here will cause
// the Read to exit before the timeout expires.
r.Close()
}()

wg.Wait()
}
Loading

0 comments on commit fce1ed0

Please sign in to comment.