Skip to content

Commit

Permalink
Merge pull request #338 from cloudwego/release-v0.6.1
Browse files Browse the repository at this point in the history
chore: release v0.6.1
  • Loading branch information
joway committed Jun 12, 2024
2 parents 8c8b872 + ae18a62 commit 2f8043a
Show file tree
Hide file tree
Showing 21 changed files with 910 additions and 997 deletions.
10 changes: 10 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Typo check: https://github.com/crate-ci/typos

[files]
extend-exclude = ["go.sum"]

[default.extend-identifiers]
# *sigh* this just isn't worth the cost of fixing
nd = "nd"
paniced = "paniced"
write_datas = "write_datas"
17 changes: 10 additions & 7 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// Concurrent connection access error
ErrConcurrentAccess = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -110,11 +112,12 @@ func (e *exception) Temporary() bool {

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll does not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConcurrentAccess: "concurrent connection access",
}
13 changes: 11 additions & 2 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,15 @@ func (c *connection) MallocLen() (length int) {
// If empty, it will call syscall.Write to send data directly,
// otherwise the buffer will be sent asynchronously by the epoll trigger.
func (c *connection) Flush() error {
if !c.IsActive() || !c.lock(flushing) {
if !c.IsActive() {
return Exception(ErrConnClosed, "when flush")
}

if !c.lock(flushing) {
return Exception(ErrConcurrentAccess, "when flush")
}
defer c.unlock(flushing)

c.outputBuffer.Flush()
return c.flush()
}
Expand Down Expand Up @@ -282,9 +287,13 @@ func (c *connection) Read(p []byte) (n int, err error) {

// Write will Flush soon.
func (c *connection) Write(p []byte) (n int, err error) {
if !c.IsActive() || !c.lock(flushing) {
if !c.IsActive() {
return 0, Exception(ErrConnClosed, "when write")
}

if !c.lock(flushing) {
return 0, Exception(ErrConcurrentAccess, "when write")
}
defer c.unlock(flushing)

dst, _ := c.outputBuffer.Malloc(len(p))
Expand Down
34 changes: 20 additions & 14 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ func writeAll(fd int, buf []byte) error {
// Large packet write test. The socket buffer is 2MB by default, here to verify
// whether Connection.Close can be executed normally after socket output buffer is full.
func TestLargeBufferWrite(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

trigger := make(chan int)
Expand All @@ -231,7 +232,7 @@ func TestLargeBufferWrite(t *testing.T) {
}
}()

conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
rfd := <-trigger

Expand Down Expand Up @@ -267,7 +268,8 @@ func TestLargeBufferWrite(t *testing.T) {
}

func TestWriteTimeout(t *testing.T) {
ln, err := createTestListener("tcp", ":1234")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

interval := time.Millisecond * 100
Expand Down Expand Up @@ -296,7 +298,7 @@ func TestWriteTimeout(t *testing.T) {
}
}()

conn, err := DialConnection("tcp", ":1234", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

_, err = conn.Writer().Malloc(1024)
Expand Down Expand Up @@ -440,7 +442,8 @@ func TestBookSizeLargerThanMaxSize(t *testing.T) {
}

func TestConnDetach(t *testing.T) {
ln, err := createTestListener("tcp", ":1234")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)

go func() {
Expand Down Expand Up @@ -470,7 +473,7 @@ func TestConnDetach(t *testing.T) {
}
}()

c, err := DialConnection("tcp", ":1234", time.Second)
c, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)

conn := c.(*TCPConnection)
Expand All @@ -497,7 +500,8 @@ func TestConnDetach(t *testing.T) {
}

func TestParallelShortConnection(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand All @@ -523,7 +527,7 @@ func TestParallelShortConnection(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
n, err := conn.Writer().WriteBinary(make([]byte, sizePerConn))
MustNil(t, err)
Expand All @@ -546,7 +550,8 @@ func TestParallelShortConnection(t *testing.T) {
}

func TestConnectionServerClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand Down Expand Up @@ -601,7 +606,7 @@ func TestConnectionServerClose(t *testing.T) {
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
t.Logf("service end with error: %v", err)
}
}()

Expand All @@ -628,7 +633,7 @@ func TestConnectionServerClose(t *testing.T) {
wg.Add(conns * 6)
for i := 0; i < conns; i++ {
go func() {
conn, err := DialConnection("tcp", ":12345", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
err = conn.SetOnRequest(clientOnRequest)
MustNil(t, err)
Expand All @@ -644,7 +649,8 @@ func TestConnectionServerClose(t *testing.T) {
}

func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
address := getTestAddress()
ln, err := createTestListener("tcp", address)
MustNil(t, err)
defer ln.Close()

Expand All @@ -658,7 +664,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
go func() {
err := el.Serve(ln)
if err != nil {
t.Logf("servce end with error: %v", err)
t.Logf("service end with error: %v", err)
}
}()

Expand All @@ -670,7 +676,7 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
for i := 0; i < conns; i++ {
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
conn, err := DialConnection("tcp", address, time.Nanosecond)
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
_ = conn
}()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ module github.com/cloudwego/netpoll
go 1.15

require (
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
)
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 h1:PtwsQyQJGxf8iaPptPNaduEIu9BnrNms+pcRdHAxZaM=
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7/go.mod h1:2ZlV9BaUH4+NXIBF0aMdKKAnHTzqH+iMU4KUjAbL23Q=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3 h1:ZKUHguI38SRQJkq7hhmwn8lAv3xM6B5qkj1IneS15YY=
github.com/bytedance/gopkg v0.0.0-20240507064146-197ded923ae3/go.mod h1:FtQG3YbQG9L/91pbKSw787yBQPutC+457AvDW77fgUQ=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe h1:W8vbETX/n8S6EmY0Pu4Ix7VvpsJUESTwl0oCK8MJOgk=
golang.org/x/sys v0.0.0-20220110181412-a018aaa089fe/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
Expand Down
4 changes: 2 additions & 2 deletions mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func TestShardQueue(t *testing.T) {
var svrConn net.Conn
accepted := make(chan struct{})

network, address := "tcp", ":18888"
ln, err := net.Listen("tcp", ":18888")
network, address := "tcp", "localhost:12345"
ln, err := net.Listen("tcp", address)
MustNil(t, err)
stop := make(chan int, 1)
defer close(stop)
Expand Down
4 changes: 2 additions & 2 deletions net_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (d *dialer) DialConnection(network, address string, timeout time.Duration)
switch network {
case "tcp", "tcp4", "tcp6":
return d.dialTCP(ctx, network, address)
// case "udp", "udp4", "udp6": // TODO: unsupport now
// case "udp", "udp4", "udp6": // TODO: unsupported now
case "unix", "unixgram", "unixpacket":
raddr := &UnixAddr{
UnixAddr: net.UnixAddr{Name: address, Net: network},
Expand All @@ -75,7 +75,7 @@ func (d *dialer) dialTCP(ctx context.Context, network, address string) (connecti
return nil, err
}
var ipaddrs []net.IPAddr
// host maybe empty if address is ":1234"
// host maybe empty if address is :12345
if host == "" {
ipaddrs = []net.IPAddr{{}}
} else {
Expand Down
31 changes: 18 additions & 13 deletions net_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import (

func TestDialerTCP(t *testing.T) {
dialer := NewDialer()
conn, err := dialer.DialTimeout("tcp", ":1234", time.Second)
address := getTestAddress()
conn, err := dialer.DialTimeout("tcp", address, time.Second)
MustTrue(t, err != nil)
MustTrue(t, conn.(*TCPConnection) == nil)

ln, err := CreateListener("tcp", ":1234")
ln, err := CreateListener("tcp", address)
MustNil(t, err)

stop := make(chan int, 1)
Expand All @@ -57,10 +58,10 @@ func TestDialerTCP(t *testing.T) {
}
}()

conn, err = dialer.DialTimeout("tcp", ":1234", time.Second)
conn, err = dialer.DialTimeout("tcp", address, time.Second)
MustNil(t, err)
MustTrue(t, strings.HasPrefix(conn.LocalAddr().String(), "127.0.0.1:"))
Equal(t, conn.RemoteAddr().String(), "127.0.0.1:1234")
Equal(t, conn.RemoteAddr().String(), address)
}

func TestDialerUnix(t *testing.T) {
Expand Down Expand Up @@ -106,7 +107,8 @@ func TestDialerUnix(t *testing.T) {
}

func TestDialerFdAlloc(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
address := getTestAddress()
ln, err := CreateListener("tcp", address)
MustNil(t, err)
defer ln.Close()
el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error {
Expand All @@ -121,7 +123,7 @@ func TestDialerFdAlloc(t *testing.T) {
defer el1.Shutdown(ctx1)

for i := 0; i < 100; i++ {
conn, err := DialConnection("tcp", ":1234", time.Second)
conn, err := DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd := conn.(*TCPConnection).fd
conn.Write([]byte("hello world"))
Expand All @@ -134,7 +136,8 @@ func TestDialerFdAlloc(t *testing.T) {
}

func TestFDClose(t *testing.T) {
ln, err := CreateListener("tcp", ":1234")
address := getTestAddress()
ln, err := CreateListener("tcp", address)
MustNil(t, err)
defer ln.Close()
el1, _ := NewEventLoop(func(ctx context.Context, connection Connection) error {
Expand All @@ -150,13 +153,13 @@ func TestFDClose(t *testing.T) {

var fd int
var conn Connection
conn, err = DialConnection("tcp", ":1234", time.Second)
conn, err = DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
conn.Close()

conn, err = DialConnection("tcp", ":1234", time.Second)
conn, err = DialConnection("tcp", address, time.Second)
MustNil(t, err)
fd = conn.(*TCPConnection).fd
syscall.SetNonblock(fd, true)
Expand All @@ -166,8 +169,10 @@ func TestFDClose(t *testing.T) {

// fd data package race test, use two servers and two dialers.
func TestDialerThenClose(t *testing.T) {
address1 := getTestAddress()
address2 := getTestAddress()
// server 1
ln1, _ := createTestListener("tcp", ":1231")
ln1, _ := createTestListener("tcp", address1)
el1 := mockDialerEventLoop(1)
go func() {
el1.Serve(ln1)
Expand All @@ -177,7 +182,7 @@ func TestDialerThenClose(t *testing.T) {
defer el1.Shutdown(ctx1)

// server 2
ln2, _ := createTestListener("tcp", ":1232")
ln2, _ := createTestListener("tcp", address2)
el2 := mockDialerEventLoop(2)
go func() {
el2.Serve(ln2)
Expand All @@ -194,12 +199,12 @@ func TestDialerThenClose(t *testing.T) {
defer wg.Done()
for i := 0; i < 50; i++ {
// send server 1
conn, err := DialConnection("tcp", ":1231", time.Second)
conn, err := DialConnection("tcp", address1, time.Second)
if err == nil {
mockDialerSend(1, &conn.(*TCPConnection).connection)
}
// send server 2
conn, err = DialConnection("tcp", ":1232", time.Second)
conn, err = DialConnection("tcp", address2, time.Second)
if err == nil {
mockDialerSend(2, &conn.(*TCPConnection).connection)
}
Expand Down
10 changes: 9 additions & 1 deletion net_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ func (ln *listener) Accept() (net.Conn, error) {
// tcp
var fd, sa, err = syscall.Accept(ln.fd)
if err != nil {
if err == syscall.EAGAIN {
/* https://man7.org/linux/man-pages/man2/accept.2.html
EAGAIN or EWOULDBLOCK
The socket is marked nonblocking and no connections are
present to be accepted. POSIX.1-2001 and POSIX.1-2008
allow either error to be returned for this case, and do
not require these constants to have the same value, so a
portable application should check for both possibilities.
*/
if err == syscall.EAGAIN || err == syscall.EWOULDBLOCK {
return nil, nil
}
return nil, err
Expand Down
Loading

0 comments on commit 2f8043a

Please sign in to comment.