Skip to content

Commit

Permalink
feat: support client and server writetimeout (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
Duslia committed Nov 11, 2022
1 parent 3090994 commit 8e3084b
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/bytedance/go-tagexpr/v2 v2.9.2
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7
github.com/bytedance/sonic v1.5.0
github.com/cloudwego/netpoll v0.2.6
github.com/cloudwego/netpoll v0.3.1
github.com/fsnotify/fsnotify v1.5.4
github.com/tidwall/gjson v1.13.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/bytedance/sonic v1.5.0 h1:XWdTi8bwPgxIML+eNV1IwNuTROK6EUrQ65ey8yd6fRQ
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06 h1:1sDoSuDPWzhkdzNVxCxtIaKiAe96ESVPv8coGwc1gZ4=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/cloudwego/netpoll v0.2.6 h1:vzN8cyayoa9RdCOG87tqkYO/j2hA4SMLC+vkcNUq6uI=
github.com/cloudwego/netpoll v0.2.6/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/cloudwego/netpoll v0.3.1 h1:xByoORmCLIyKZ8gS+da06WDo3j+jvmhaqS2KeKejtBk=
github.com/cloudwego/netpoll v0.3.1/go.mod h1:1T2WVuQ+MQw6h6DpE45MohSvDTKdy2DlzCx2KsnPI4E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
7 changes: 7 additions & 0 deletions pkg/app/client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,10 @@ func WithRetryConfig(opts ...retry.Option) config.ClientOption {
o.RetryConfig = retryCfg
}}
}

// WithWriteTimeout sets write timeout.
func WithWriteTimeout(t time.Duration) config.ClientOption {
return config.ClientOption{F: func(o *config.ClientOptions) {
o.WriteTimeout = t
}}
}
2 changes: 2 additions & 0 deletions pkg/app/client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestClientOptions(t *testing.T) {
retry.WithMaxJitter(1*time.Second),
retry.WithDelayPolicy(retry.CombineDelay(retry.FixedDelayPolicy, retry.BackOffDelayPolicy, retry.RandomDelayPolicy)),
),
WithWriteTimeout(time.Second),
})
assert.DeepEqual(t, 100*time.Millisecond, opt.DialTimeout)
assert.DeepEqual(t, 128, opt.MaxConnsPerHost)
Expand All @@ -51,6 +52,7 @@ func TestClientOptions(t *testing.T) {
assert.DeepEqual(t, 5*time.Second, opt.MaxConnWaitTimeout)
assert.DeepEqual(t, false, opt.KeepAlive)
assert.DeepEqual(t, 1*time.Second, opt.ReadTimeout)
assert.DeepEqual(t, 1*time.Second, opt.WriteTimeout)
assert.DeepEqual(t, true, opt.ResponseBodyStream)
assert.DeepEqual(t, uint(2), opt.RetryConfig.MaxAttemptTimes)
assert.DeepEqual(t, 100*time.Millisecond, opt.RetryConfig.Delay)
Expand Down
9 changes: 9 additions & 0 deletions pkg/app/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func WithReadTimeout(t time.Duration) config.Option {
}}
}

// WithWriteTimeout sets write timeout.
//
// Connection will be closed when write request timeout.
func WithWriteTimeout(t time.Duration) config.Option {
return config.Option{F: func(o *config.Options) {
o.WriteTimeout = t
}}
}

// WithIdleTimeout sets idle timeout.
//
// Close the connection when the successive request timeout (in keepalive mode).
Expand Down
1 change: 1 addition & 0 deletions pkg/common/config/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
type Options struct {
KeepAliveTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
RedirectTrailingSlash bool
MaxRequestBodySize int
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/config/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config

import (
"testing"
"time"

"github.com/cloudwego/hertz/pkg/app/server/registry"
"github.com/cloudwego/hertz/pkg/common/test/assert"
Expand All @@ -30,6 +31,7 @@ func TestDefaultOptions(t *testing.T) {
assert.DeepEqual(t, defaultKeepAliveTimeout, options.KeepAliveTimeout)
assert.DeepEqual(t, defaultReadTimeout, options.ReadTimeout)
assert.DeepEqual(t, defaultReadTimeout, options.IdleTimeout)
assert.DeepEqual(t, time.Duration(0), options.WriteTimeout)
assert.True(t, options.RedirectTrailingSlash)
assert.True(t, options.RedirectTrailingSlash)
assert.False(t, options.HandleMethodNotAllowed)
Expand Down
55 changes: 55 additions & 0 deletions pkg/common/config/request_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@

package config

import "time"

var preDefinedOpts []RequestOption

type RequestOptions struct {
tags map[string]string
isSD bool

dialTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
}

// RequestOption is the only struct to set request-level options.
Expand Down Expand Up @@ -55,6 +61,40 @@ func WithSD(b bool) RequestOption {
}}
}

// WithDialTimeout sets dial timeout.
//
// This is the request level configuration. It has a higher
// priority than the client level configuration
// Note: it won't take effect in the case of the number of
// connections in the connection pool exceeds the maximum
// number of connections and needs to establish a connection
// while waiting.
func WithDialTimeout(t time.Duration) RequestOption {
return RequestOption{F: func(o *RequestOptions) {
o.dialTimeout = t
}}
}

// WithReadTimeout sets read timeout.
//
// This is the request level configuration. It has a higher
// priority than the client level configuration
func WithReadTimeout(t time.Duration) RequestOption {
return RequestOption{F: func(o *RequestOptions) {
o.readTimeout = t
}}
}

// WithWriteTimeout sets write timeout.
//
// This is the request level configuration. It has a higher
// priority than the client level configuration
func WithWriteTimeout(t time.Duration) RequestOption {
return RequestOption{F: func(o *RequestOptions) {
o.writeTimeout = t
}}
}

func (o *RequestOptions) Apply(opts []RequestOption) {
for _, op := range opts {
op.F(o)
Expand All @@ -73,6 +113,18 @@ func (o *RequestOptions) IsSD() bool {
return o.isSD
}

func (o *RequestOptions) DialTimeout() time.Duration {
return o.dialTimeout
}

func (o *RequestOptions) ReadTimeout() time.Duration {
return o.readTimeout
}

func (o *RequestOptions) WriteTimeout() time.Duration {
return o.writeTimeout
}

func (o *RequestOptions) CopyTo(dst *RequestOptions) {
if dst.tags == nil {
dst.tags = make(map[string]string)
Expand All @@ -83,6 +135,9 @@ func (o *RequestOptions) CopyTo(dst *RequestOptions) {
}

dst.isSD = o.isSD
dst.readTimeout = o.readTimeout
dst.writeTimeout = o.writeTimeout
dst.dialTimeout = o.dialTimeout
}

// SetPreDefinedOpts Pre define some RequestOption here
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/config/request_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config

import (
"testing"
"time"

"github.com/cloudwego/hertz/pkg/common/test/assert"
)
Expand All @@ -29,10 +30,16 @@ func TestRequestOptions(t *testing.T) {
WithTag("c", "d"),
WithTag("e", "f"),
WithSD(true),
WithDialTimeout(time.Second),
WithReadTimeout(time.Second),
WithWriteTimeout(time.Second),
})
assert.DeepEqual(t, "b", opt.Tag("a"))
assert.DeepEqual(t, "d", opt.Tag("c"))
assert.DeepEqual(t, "f", opt.Tag("e"))
assert.DeepEqual(t, time.Second, opt.DialTimeout())
assert.DeepEqual(t, time.Second, opt.ReadTimeout())
assert.DeepEqual(t, time.Second, opt.WriteTimeout())
assert.True(t, opt.IsSD())
}

Expand All @@ -52,6 +59,9 @@ func TestRequestOptionsWithDefaultOpts(t *testing.T) {
assert.False(t, opt.IsSD())
SetPreDefinedOpts()
assert.Nil(t, preDefinedOpts)
assert.DeepEqual(t, time.Duration(0), opt.WriteTimeout())
assert.DeepEqual(t, time.Duration(0), opt.ReadTimeout())
assert.DeepEqual(t, time.Duration(0), opt.DialTimeout())
}

// TestRequestOptions_CopyTo test request options copy to another one
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
ErrHijacked = errors.New("connection has been hijacked")
ErrIdleTimeout = errors.New("idle timeout")
ErrTimeout = errors.New("timeout")
ErrReadTimeout = errors.New("read timeout")
ErrWriteTimeout = errors.New("write timeout")
ErrDialTimeout = errors.New("dial timeout")
ErrNothingRead = errors.New("nothing read")
ErrShortConnection = errors.New("short connection")
)
Expand Down
40 changes: 39 additions & 1 deletion pkg/common/test/mock/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,28 @@ type Conn struct {
zw network.ReadWriter
}

func (m *Conn) SetWriteTimeout(t time.Duration) error {
// TODO implement me
panic("implement me")
}

type SlowReadConn struct {
*Conn
}

func (m *SlowReadConn) SetWriteTimeout(t time.Duration) error {
// TODO implement me
panic("implement me")
}

func SlowReadDialer(addr string) (network.Conn, error) {
return NewSlowReadConn(""), nil
}

func SlowWriteDialer(addr string) (network.Conn, error) {
return NewSlowWriteConn(""), nil
}

func (m *Conn) ReadBinary(n int) (p []byte, err error) {
return m.zr.(netpoll.Reader).ReadBinary(n)
}
Expand Down Expand Up @@ -100,7 +114,7 @@ func (m *SlowReadConn) Peek(i int) ([]byte, error) {
time.Sleep(100 * time.Millisecond)
if err != nil || len(b) != i {
time.Sleep(m.readTimeout)
return nil, errs.ErrTimeout
return nil, errs.ErrReadTimeout
}
return b, err
}
Expand All @@ -119,6 +133,30 @@ func NewSlowReadConn(source string) *SlowReadConn {
return &SlowReadConn{NewConn(source)}
}

type SlowWriteConn struct {
*Conn
writeTimeout time.Duration
}

func (m *SlowWriteConn) SetWriteTimeout(t time.Duration) error {
m.writeTimeout = t
return nil
}

func NewSlowWriteConn(source string) *SlowWriteConn {
return &SlowWriteConn{NewConn(source), 0}
}

func (m *SlowWriteConn) Flush() error {
err := m.zw.Flush()
time.Sleep(100 * time.Millisecond)
if err == nil {
time.Sleep(m.writeTimeout)
return errs.ErrWriteTimeout
}
return err
}

func (m *Conn) Close() error {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/network/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Conn interface {

// SetReadTimeout should work for every Read process
SetReadTimeout(t time.Duration) error
SetWriteTimeout(t time.Duration) error
}

type ConnTLSer interface {
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/netpoll/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type mockConn struct {
off int
}

func (m *mockConn) SetWriteTimeout(timeout time.Duration) error {
// TODO implement me
panic("implement me")
}

// mockConn's methods is simplified for unit test
// Peek returns the next n bytes without advancing the reader
func (m *mockConn) Peek(n int) (b []byte, err error) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/netpoll/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type transporter struct {
addr string
keepAliveTimeout time.Duration
readTimeout time.Duration
writeTimeout time.Duration
listener net.Listener
eventLoop netpoll.EventLoop
listenConfig *net.ListenConfig
Expand All @@ -49,6 +50,7 @@ func NewTransporter(options *config.Options) network.Transporter {
addr: options.Addr,
keepAliveTimeout: options.KeepAliveTimeout,
readTimeout: options.ReadTimeout,
writeTimeout: options.WriteTimeout,
listener: nil,
eventLoop: nil,
listenConfig: options.ListenConfig,
Expand All @@ -74,6 +76,9 @@ func (t *transporter) ListenAndServe(onReq network.OnData) (err error) {
netpoll.WithIdleTimeout(t.keepAliveTimeout),
netpoll.WithOnPrepare(func(conn netpoll.Connection) context.Context {
conn.SetReadTimeout(t.readTimeout) // nolint:errcheck
if t.writeTimeout > 0 {
conn.SetWriteTimeout(t.writeTimeout)
}
return context.Background()
}),
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/network/standard/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ type Conn struct {
maxSize int // history max malloc size
}

func (c *Conn) SetWriteTimeout(t time.Duration) error {
if t <= 0 {
return c.c.SetWriteDeadline(time.Time{})
}
return c.c.SetWriteDeadline(time.Now().Add(t))
}

func (c *Conn) SetReadTimeout(t time.Duration) error {
if t <= 0 {
return c.c.SetReadDeadline(time.Time{})
Expand Down

0 comments on commit 8e3084b

Please sign in to comment.