-
Notifications
You must be signed in to change notification settings - Fork 37
/
client.go
116 lines (109 loc) · 3.44 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package client
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/httpclient"
"github.com/databricks/databricks-sdk-go/useragent"
)
func New(cfg *config.Config) (*DatabricksClient, error) {
err := cfg.EnsureResolved()
if err != nil {
return nil, err
}
retryTimeout := time.Duration(orDefault(cfg.RetryTimeoutSeconds, 300)) * time.Second
httpTimeout := time.Duration(orDefault(cfg.HTTPTimeoutSeconds, 60)) * time.Second
return &DatabricksClient{
Config: cfg,
client: httpclient.NewApiClient(httpclient.ClientConfig{
RetryTimeout: retryTimeout,
HTTPTimeout: httpTimeout,
RateLimitPerSecond: orDefault(cfg.RateLimitPerSecond, 15),
DebugHeaders: cfg.DebugHeaders,
DebugTruncateBytes: cfg.DebugTruncateBytes,
InsecureSkipVerify: cfg.InsecureSkipVerify,
Transport: cfg.HTTPTransport,
Visitors: []httpclient.RequestVisitor{
cfg.Authenticate,
func(r *http.Request) error {
if r.URL == nil {
return fmt.Errorf("no URL found in request")
}
url, err := url.Parse(cfg.Host)
if err != nil {
return err
}
r.URL.Host = url.Host
r.URL.Scheme = url.Scheme
return nil
},
func(r *http.Request) error {
ctx := useragent.InContext(r.Context(), "auth", cfg.AuthType)
*r = *r.WithContext(ctx) // replace request
return nil
},
func(r *http.Request) error {
// Detect if we are running in a CI/CD environment
provider := useragent.CiCdProvider()
if provider == "" {
return nil
}
// Add the detected CI/CD provider to the user agent
ctx := useragent.InContext(r.Context(), "cicd", provider)
*r = *r.WithContext(ctx) // replace request
return nil
},
},
TransientErrors: []string{
"REQUEST_LIMIT_EXCEEDED", // This is temporary workaround for SCIM API returning 500. Remove when it's fixed
},
ErrorMapper: apierr.GetAPIError,
ErrorRetriable: func(ctx context.Context, err error) bool {
var apiErr *apierr.APIError
if errors.As(err, &apiErr) {
return apiErr.IsRetriable(ctx)
}
return false
},
}),
}, nil
}
type DatabricksClient struct {
Config *config.Config
client *httpclient.ApiClient
}
// ConfiguredAccountID returns Databricks Account ID if it's provided in config,
// empty string otherwise
func (c *DatabricksClient) ConfiguredAccountID() string {
return c.Config.AccountID
}
// Do sends an HTTP request against path.
func (c *DatabricksClient) Do(ctx context.Context, method, path string,
headers map[string]string, request, response any,
visitors ...func(*http.Request) error) error {
opts := []httpclient.DoOption{}
for _, v := range visitors {
opts = append(opts, httpclient.WithRequestVisitor(v))
}
opts = append(opts, httpclient.WithRequestHeaders(headers))
opts = append(opts, httpclient.WithRequestData(request))
opts = append(opts, httpclient.WithResponseUnmarshal(response))
// Remove extra `/` from path for files API.
// Once the OpenAPI spec doesn't include the extra slash, we can remove this
if strings.HasPrefix(path, "/api/2.0/fs/files//") {
path = strings.Replace(path, "/api/2.0/fs/files//", "/api/2.0/fs/files/", 1)
}
return c.client.Do(ctx, method, path, opts...)
}
func orDefault(configured, _default int) int {
if configured == 0 {
return _default
}
return configured
}