Skip to content

Commit

Permalink
Use the specified connection timeout when creating connections (apach…
Browse files Browse the repository at this point in the history
…e#137)

* Use the specified connection timeout when creating connections

* Fixed compile and other errors
  • Loading branch information
merlimat authored and wolfstudy committed Dec 20, 2019
1 parent 13d4581 commit cfe87b3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
1 change: 1 addition & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ClientOptions struct {
// This parameter is required
URL string

// Timeout for the establishment of a TCP connection (default: 30 seconds)
ConnectionTimeout time.Duration

// Set the operation timeout (default: 30 seconds)
Expand Down
12 changes: 11 additions & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/url"
"time"

"github.com/golang/protobuf/proto"

Expand All @@ -31,6 +32,10 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
)

const (
defaultConnectionTimeout = 30*time.Second
)

type client struct {
options ClientOptions

Expand Down Expand Up @@ -81,8 +86,13 @@ func newClient(options ClientOptions) (Client, error) {
}
}

connectionTimeout := options.ConnectionTimeout
if connectionTimeout.Nanoseconds() == 0 {
connectionTimeout = defaultConnectionTimeout
}

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider),
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
c.lookupService = internal.NewLookupService(c.rpcClient, url)
Expand Down
14 changes: 9 additions & 5 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ type incomingCmd struct {

type connection struct {
sync.Mutex
cond *sync.Cond
state connectionState
cond *sync.Cond
state connectionState
connectionTimeout time.Duration

logicalAddr *url.URL
physicalAddr *url.URL
Expand Down Expand Up @@ -150,9 +151,11 @@ type connection struct {
auth auth.Provider
}

func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, auth auth.Provider) *connection {
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
state: connectionInit,
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
writeBuffer: NewBuffer(4096),
Expand Down Expand Up @@ -202,7 +205,7 @@ func (c *connection) connect() bool {

if c.tlsOptions == nil {
// Clear text connection
cnx, err = net.Dial("tcp", c.physicalAddr.Host)
cnx, err = net.DialTimeout("tcp", c.physicalAddr.Host, c.connectionTimeout)
} else {
// TLS connection
tlsConfig, err = c.getTLSConfig()
Expand All @@ -211,7 +214,8 @@ func (c *connection) connect() bool {
return false
}

cnx, err = tls.Dial("tcp", c.physicalAddr.Host, tlsConfig)
d := &net.Dialer{Timeout: c.connectionTimeout}
cnx, err = tls.DialWithDialer(d, "tcp", c.physicalAddr.Host, tlsConfig)
}

if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package internal
import (
"net/url"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"

Expand All @@ -36,16 +37,18 @@ type ConnectionPool interface {
}

type connectionPool struct {
pool sync.Map
tlsOptions *TLSOptions
auth auth.Provider
pool sync.Map
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
}

// NewConnectionPool init connection pool.
func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider) ConnectionPool {
func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration) ConnectionPool {
return &connectionPool{
tlsOptions: tlsOptions,
auth: auth,
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
}
}

Expand All @@ -66,7 +69,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U

// Try to create a new connection
newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.auth))
newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()
Expand Down

0 comments on commit cfe87b3

Please sign in to comment.