Skip to content

Commit

Permalink
tweak connect.Client
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevilliers committed Dec 5, 2019
1 parent e233b21 commit d4b06f3
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion internal/ctl/connectors/add.go
Expand Up @@ -61,7 +61,7 @@ func doAddConnectors(cmd *cobra.Command, params *addConnectorsCmdParams) error {

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/connectors/list.go
Expand Up @@ -51,7 +51,7 @@ func doListConnectors(_ *cobra.Command, params *listConnectorsCmdParams) error {

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
16 changes: 4 additions & 12 deletions internal/ctl/connectors/manage.go
Expand Up @@ -80,8 +80,7 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e

clusterLogger.Debug("executing manage connectors command")

err := checkConfig(params)
if err != nil {
if err := checkConfig(params); err != nil {
return errors.Wrap(err, "Error with configuration")
}

Expand All @@ -103,7 +102,7 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand All @@ -113,20 +112,16 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e
return errors.Wrap(err, "Error creating connectors manager")
}

ctx := context.Background()

if params.EnableHealthCheck {
healthCheckHandler := healthcheck.New(mngr)

go func() {
err := healthCheckHandler.Start(params.HealthCheckAddress)
if err != nil {
if err := healthCheckHandler.Start(params.HealthCheckAddress); err != nil {
clusterLogger.WithError(err).Fatalln("Error starting healthcheck")
}
}()

// nolint
defer healthCheckHandler.Shutdown(ctx)
defer healthCheckHandler.Shutdown(context.Background())
}

if params.RunOnce {
Expand All @@ -135,13 +130,10 @@ func doManageConnectors(cmd *cobra.Command, params *manageConnectorsCmdParams) e
}
} else {
stopCh := signals.SetupSignalHandler()

if err := mngr.Manage(source, stopCh); err != nil {
return errors.Wrap(err, "Error running connector manager")
}
}

clusterLogger.Info("finished executing manage connectors command")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/connectors/pause.go
Expand Up @@ -48,7 +48,7 @@ func doPauseConnectors(_ *cobra.Command, params *pauseConnectorsCmdParams) error

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/connectors/remove.go
Expand Up @@ -48,7 +48,7 @@ func doRemoveConnectors(_ *cobra.Command, params *removeConnectorsCmdParams) err

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/connectors/restart.go
Expand Up @@ -62,7 +62,7 @@ func doRestartConnectors(_ *cobra.Command, params *restartConnectorsCmdParams) e

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/connectors/resume.go
Expand Up @@ -48,7 +48,7 @@ func doResumeConnectors(_ *cobra.Command, params *resumeConnectorsCmdParams) err

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/ctl/plugins/list.go
Expand Up @@ -51,7 +51,7 @@ func doListPlugins(_ *cobra.Command, params *listPluginsCmdParams) error {

userAgent := fmt.Sprintf("90poe.io/connectctl/%s", version.Version)

client, err := connect.NewClient(params.ClusterURL, userAgent)
client, err := connect.NewClient(params.ClusterURL, connect.WithUserAgent(userAgent))
if err != nil {
return errors.Wrap(err, "error creating connect client")
}
Expand Down
46 changes: 30 additions & 16 deletions pkg/client/connect/client.go
Expand Up @@ -30,31 +30,47 @@ type Client struct {

// HTTP client used to communicate with the API. By default
// http.DefaultClient will be used.
HTTPClient *http.Client
httpClient *http.Client

// User agent used when communicating with the Kafka Connect API.
UserAgent string
userAgent string
}

// Option can be supplied that override the default Clients properties
type Option func(c *Client)

// WithUserAgent allows the userAgent to be overridden
func WithUserAgent(userAgent string) Option {
return func(c *Client) {
c.userAgent = userAgent
}
}

// WithHTTPClient allows a specific http.Client to be set
func WithHTTPClient(httpClient *http.Client) Option {
return func(c *Client) {
c.httpClient = httpClient
}
}

// NewClient returns a new Kafka Connect API client that communicates host.
func NewClient(host string, userAgent string) (*Client, error) {
func NewClient(host string, opts ...Option) (*Client, error) {
hostURL, err := url.Parse(host)
if err != nil {
return nil, errors.Wrapf(err, "parsing url %s", host)
}

if userAgent == "" {
userAgent = userAgentDefault
c := &Client{
host: hostURL,
userAgent: userAgentDefault,
httpClient: http.DefaultClient,
}

return &Client{host: hostURL, UserAgent: userAgent}, nil
}

func (c *Client) httpClient() *http.Client {
if c.HTTPClient == nil {
return http.DefaultClient
for _, opt := range opts {
opt(c)
}
return c.HTTPClient

return c, nil
}

// Host returns the API root URL the Client is configured to talk to.
Expand Down Expand Up @@ -95,9 +111,7 @@ func (c *Client) NewRequest(method, path string, body interface{}) (*http.Reques
if contentType != "" {
request.Header.Set("Content-Type", contentType)
}
if c.UserAgent != "" {
request.Header.Set("User-Agent", c.UserAgent)
}
request.Header.Set("User-Agent", c.userAgent)

return request, nil
}
Expand All @@ -106,7 +120,7 @@ func (c *Client) NewRequest(method, path string, body interface{}) (*http.Reques
// JSON-decoded and stored in the value pointed to by v, or returned as an
// error if an API or HTTP error has occurred.
func (c *Client) Do(req *http.Request, v interface{}) (*http.Response, error) {
response, err := c.httpClient().Do(req)
response, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/manager/manager.go
Expand Up @@ -36,12 +36,6 @@ type ConnectorManager struct {

// NewConnectorsManager creates a new ConnectorManager
func NewConnectorsManager(client client, config *Config) (*ConnectorManager, error) {
//userAgent := fmt.Sprintf("90poe.io/connectctl/%s", config.Version)

//client, err := connect.NewClient(config.ClusterURL, userAgent)
//if err != nil {
// return nil, errors.Wrap(err, "error creating connect client")
//}
return &ConnectorManager{
config: config,
client: client,
Expand Down

0 comments on commit d4b06f3

Please sign in to comment.