/
factory_utils.go
147 lines (129 loc) · 5.54 KB
/
factory_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package provider
import (
dc "github.com/confluentinc/ccloud-sdk-go-v2/data-catalog/v1"
kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
"github.com/hashicorp/go-retryablehttp"
"net/http"
fgb "github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway/v1"
schemaregistry "github.com/confluentinc/ccloud-sdk-go-v2/schema-registry/v1"
)
type FlinkRestClientFactory struct {
userAgent string
maxRetries *int
}
func (f FlinkRestClientFactory) CreateFlinkRestClient(restEndpoint, organizationId, environmentId, computePoolId, principalId, flinkApiKey, flinkApiSecret string, isMetadataSetInProviderBlock bool) *FlinkRestClient {
config := fgb.NewConfiguration()
config.Servers[0].URL = restEndpoint
config.UserAgent = f.userAgent
if f.maxRetries != nil {
config.HTTPClient = NewRetryableClientFactory(WithMaxRetries(*f.maxRetries)).CreateRetryableClient()
} else {
config.HTTPClient = NewRetryableClientFactory().CreateRetryableClient()
}
return &FlinkRestClient{
apiClient: fgb.NewAPIClient(config),
organizationId: organizationId,
environmentId: environmentId,
computePoolId: computePoolId,
principalId: principalId,
flinkApiKey: flinkApiKey,
flinkApiSecret: flinkApiSecret,
restEndpoint: restEndpoint,
isMetadataSetInProviderBlock: isMetadataSetInProviderBlock,
}
}
type SchemaRegistryRestClientFactory struct {
userAgent string
maxRetries *int
}
func (f SchemaRegistryRestClientFactory) CreateSchemaRegistryRestClient(restEndpoint, clusterId, clusterApiKey, clusterApiSecret string, isMetadataSetInProviderBlock bool) *SchemaRegistryRestClient {
config := schemaregistry.NewConfiguration()
config.Servers[0].URL = restEndpoint
config.UserAgent = f.userAgent
if f.maxRetries != nil {
config.HTTPClient = NewRetryableClientFactory(WithMaxRetries(*f.maxRetries)).CreateRetryableClient()
} else {
config.HTTPClient = NewRetryableClientFactory().CreateRetryableClient()
}
return &SchemaRegistryRestClient{
apiClient: schemaregistry.NewAPIClient(config),
clusterId: clusterId,
clusterApiKey: clusterApiKey,
clusterApiSecret: clusterApiSecret,
restEndpoint: restEndpoint,
isMetadataSetInProviderBlock: isMetadataSetInProviderBlock,
}
}
func (f SchemaRegistryRestClientFactory) CreateDataCatalogClient(restEndpoint, clusterId, clusterApiKey, clusterApiSecret string, isMetadataSetInProviderBlock bool) *SchemaRegistryRestClient {
config := dc.NewConfiguration()
config.Servers[0].URL = restEndpoint
config.UserAgent = f.userAgent
if f.maxRetries != nil {
config.HTTPClient = NewRetryableClientFactory(WithMaxRetries(*f.maxRetries)).CreateRetryableClient()
} else {
config.HTTPClient = NewRetryableClientFactory().CreateRetryableClient()
}
return &SchemaRegistryRestClient{
dataCatalogApiClient: dc.NewAPIClient(config),
clusterId: clusterId,
clusterApiKey: clusterApiKey,
clusterApiSecret: clusterApiSecret,
restEndpoint: restEndpoint,
isMetadataSetInProviderBlock: isMetadataSetInProviderBlock,
}
}
type KafkaRestClientFactory struct {
userAgent string
maxRetries *int
}
func (f KafkaRestClientFactory) CreateKafkaRestClient(restEndpoint, clusterId, clusterApiKey, clusterApiSecret string, isMetadataSetInProviderBlock, isClusterIdSetInProviderBlock bool) *KafkaRestClient {
config := kafkarestv3.NewConfiguration()
config.Servers[0].URL = restEndpoint
config.UserAgent = f.userAgent
if f.maxRetries != nil {
config.HTTPClient = NewRetryableClientFactory(WithMaxRetries(*f.maxRetries)).CreateRetryableClient()
} else {
config.HTTPClient = NewRetryableClientFactory().CreateRetryableClient()
}
return &KafkaRestClient{
apiClient: kafkarestv3.NewAPIClient(config),
clusterId: clusterId,
clusterApiKey: clusterApiKey,
clusterApiSecret: clusterApiSecret,
restEndpoint: restEndpoint,
isMetadataSetInProviderBlock: isMetadataSetInProviderBlock,
isClusterIdSetInProviderBlock: isClusterIdSetInProviderBlock,
}
}
type RetryableClientFactoryOption = func(c *RetryableClientFactory)
type RetryableClientFactory struct {
maxRetries *int
}
func WithMaxRetries(maxRetries int) RetryableClientFactoryOption {
return func(c *RetryableClientFactory) {
c.maxRetries = &maxRetries
}
}
func NewRetryableClientFactory(opts ...RetryableClientFactoryOption) *RetryableClientFactory {
c := &RetryableClientFactory{}
for _, opt := range opts {
opt(c)
}
return c
}
// CreateRetryableClient creates retryable HTTP client that performs automatic retries with exponential backoff for 429
// and 5** (except 501) errors. Otherwise, the response is returned and left to the caller to interpret.
func (f RetryableClientFactory) CreateRetryableClient() *http.Client {
// Implicitly using default retry configuration
// under the assumption is it's OK to spend retrying a single HTTP call around 15 seconds in total: 1 + 2 + 4 + 8
// An exponential backoff equation: https://github.com/hashicorp/go-retryablehttp/blob/master/client.go#L493
// retryWaitMax = math.Pow(2, float64(attemptNum)) * float64(retryWaitMin)
// defaultRetryWaitMin = 1 * time.Second
// defaultRetryWaitMax = 30 * time.Second
// defaultRetryMax = 4
retryClient := retryablehttp.NewClient()
if f.maxRetries != nil {
retryClient.RetryMax = *f.maxRetries
}
return retryClient.StandardClient()
}