Skip to content

Commit

Permalink
Merge branch 'develop' into fix/fd-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Feb 15, 2023
2 parents 059c308 + 6a5a4f7 commit 0a0498b
Show file tree
Hide file tree
Showing 64 changed files with 1,182 additions and 324 deletions.
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# For more information, please refer to https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners

* @cloudwego/netpoll-reviewers @cloudwego/netpoll-approvers @cloudwego/netpoll-maintainers
38 changes: 22 additions & 16 deletions .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,41 @@ name: Push and Pull Request Check
on: [ push, pull_request ]

jobs:
build:
runs-on: ubuntu-latest
compatibility-test:
strategy:
matrix:
go: [ 1.15, 1.19 ]
os: [ X64, ARM64 ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2

- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v2
uses: actions/setup-go@v3
with:
go-version: 1.16

go-version: ${{ matrix.go }}
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...
- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
style-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.16
- name: Check License Header
uses: apache/skywalking-eyes@main
uses: apache/skywalking-eyes/header@v0.4.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Lint
run: |
test -z "$(gofmt -s -l .)"
go vet -stdmethods=false $(go list ./...)
- name: Unit Test
run: go test -v -race -covermode=atomic -coverprofile=coverage.out ./...

- name: Benchmark
run: go test -bench=. -benchmem -run=none ./...
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
CloudWeGO
Copyright 2021 CloudWeGO authors.
Copyright 2022 CloudWeGO authors.

Go
Copyright (c) 2009 The Go Authors.
6 changes: 5 additions & 1 deletion connection.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,6 +45,10 @@ type Connection interface {
// A zero value for timeout means Reader will not timeout.
SetReadTimeout(timeout time.Duration) error

// SetWriteTimeout sets the timeout for future Write calls wait.
// A zero value for timeout means Writer will not timeout.
SetWriteTimeout(timeout time.Duration) error

// SetIdleTimeout sets the idle timeout of connections.
// Idle connections that exceed the set timeout are no longer guaranteed to be active,
// but can be checked by calling IsActive.
Expand Down
9 changes: 6 additions & 3 deletions connection_errors.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,16 +23,18 @@ import (
const (
// The connection closed when in use.
ErrConnClosed = syscall.Errno(0x101)
// read I/O buffer timeout, calling by Connection.Reader
// Read I/O buffer timeout, calling by Connection.Reader
ErrReadTimeout = syscall.Errno(0x102)
// dial timeout
// Dial timeout
ErrDialTimeout = syscall.Errno(0x103)
// Calling dialer without timeout.
ErrDialNoDeadline = syscall.Errno(0x104) // TODO: no-deadline support in future
// The calling function not support.
ErrUnsupported = syscall.Errno(0x105)
// Same as io.EOF
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -94,4 +96,5 @@ var errnos = [...]string{
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
}
2 changes: 1 addition & 1 deletion connection_errors_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
114 changes: 101 additions & 13 deletions connection_impl.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,9 @@ type connection struct {
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan struct{}
waitReadSize int32
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
Expand Down Expand Up @@ -85,6 +87,14 @@ func (c *connection) SetReadTimeout(timeout time.Duration) error {
return nil
}

// SetWriteTimeout implements Connection.
func (c *connection) SetWriteTimeout(timeout time.Duration) error {
if timeout >= 0 {
c.writeTimeout = timeout
}
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -341,26 +351,31 @@ func (c *connection) initNetFD(conn Conn) {
}

func (c *connection) initFDOperator() {
op := allocop()
var op *FDOperator
if c.pd != nil && c.pd.operator != nil {
// reuse operator created at connect step
op = c.pd.operator
} else {
poll := pollmanager.Pick()
op = poll.Alloc()
}
op.FD = c.fd
op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
op.Inputs, op.InputAck = c.inputs, c.inputAck
op.Outputs, op.OutputAck = c.outputs, c.outputAck

// if connection has been registered, must reuse poll here.
if c.pd != nil && c.pd.operator != nil {
op.poll = c.pd.operator.poll
}
c.operator = op
}

func (c *connection) initFinalizer() {
c.AddCloseCallback(func(connection Connection) error {
c.AddCloseCallback(func(connection Connection) (err error) {
c.stop(flushing)
// stop the finalizing state to prevent conn.fill function to be performed
c.stop(finalizing)
freeop(c.operator)
c.netFD.Close()
c.operator.Free()
if err = c.netFD.Close(); err != nil {
logger.Printf("NETPOLL: netFD close failed: %v", err)
}
c.closeBuffer()
return nil
})
Expand All @@ -385,8 +400,8 @@ func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(n))
defer atomic.StoreInt32(&c.waitReadSize, 0)
atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
}
Expand Down Expand Up @@ -453,15 +468,88 @@ func (c *connection) fill(need int) (err error) {
defer c.unlock(finalizing)

var n int
var bs [][]byte
for {
bs = c.inputs(c.inputBarrier.bs)
TryRead:
n, err = ioread(c.fd, c.inputs(c.inputBarrier.bs), c.inputBarrier.ivs)
c.inputAck(n)
if err != nil {
break
}
if n < 0 {
// we must reuse bs that has been booked, otherwise will mess the input buffer
goto TryRead
}
c.inputAck(n)
}
if c.inputBuffer.Len() >= need {
return nil
}
return err
}

// flush write data directly.
func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
// TODO: Let the upper layer pass in whether to use ZeroCopy.
var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
if err != nil && err != syscall.EAGAIN {
return Exception(err, "when flush")
}
if n > 0 {
err = c.outputBuffer.Skip(n)
c.outputBuffer.Release()
if err != nil {
return Exception(err, "when flush")
}
}
// return if write all buffer.
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if err != nil {
return Exception(err, "when flush")
}

return c.waitFlush()
}

func (c *connection) waitFlush() (err error) {
if c.writeTimeout == 0 {
select {
case err = <-c.writeTrigger:
}
return err
}

// set write timeout
if c.writeTimer == nil {
c.writeTimer = time.NewTimer(c.writeTimeout)
} else {
c.writeTimer.Reset(c.writeTimeout)
}

select {
case err = <-c.writeTrigger:
if !c.writeTimer.Stop() { // clean timer
<-c.writeTimer.C
}
return err
case <-c.writeTimer.C:
select {
// try fetch writeTrigger if both cases fires
case err = <-c.writeTrigger:
return err
default:
}
// if timeout, remove write event from poller
// we cannot flush it again, since we don't if the poller is still process outputBuffer
c.operator.Control(PollRW2R)
return Exception(ErrWriteTimeout, c.remoteAddr.String())
}
}

2 changes: 1 addition & 1 deletion connection_lock.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
22 changes: 13 additions & 9 deletions connection_onevent.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021 CloudWeGo Authors
// Copyright 2022 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@ package netpoll

import (
"context"
"log"
"sync/atomic"

"github.com/bytedance/gopkg/util/gopool"
Expand Down Expand Up @@ -98,6 +97,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetOnConnect(opts.onConnect)
c.SetOnRequest(opts.onRequest)
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)

// calling prepare first and then register.
Expand Down Expand Up @@ -212,8 +212,10 @@ func (c *connection) closeCallback(needLock bool) (err error) {
return nil
}
// If Close is called during OnPrepare, poll is not registered.
if c.closeBy(user) && c.operator.poll != nil {
c.operator.Control(PollDetach)
if c.isCloseBy(user) && c.operator.poll != nil {
if err = c.operator.Control(PollDetach); err != nil {
logger.Printf("NETPOLL: closeCallback detach operator failed: %v", err)
}
}
var latest = c.closeCallbacks.Load()
if latest == nil {
Expand All @@ -227,14 +229,16 @@ func (c *connection) closeCallback(needLock bool) (err error) {

// register only use for connection register into poll.
func (c *connection) register() (err error) {
if c.operator.poll != nil {
err = c.operator.Control(PollModReadable)
} else {
c.operator.poll = pollmanager.Pick()
if c.operator.isUnused() {
// operator is not registered
err = c.operator.Control(PollReadable)
} else {
// operator is already registered
// change event to wait read new data
err = c.operator.Control(PollModReadable)
}
if err != nil {
log.Println("connection register failed:", err.Error())
logger.Printf("NETPOLL: connection register failed: %v", err)
c.Close()
return Exception(ErrConnClosed, err.Error())
}
Expand Down
Loading

0 comments on commit 0a0498b

Please sign in to comment.