Skip to content

Commit

Permalink
feat: add AlloyDB refresher (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
enocom committed Mar 29, 2022
2 parents 9e83635 + 5d77391 commit d0d6a11
Show file tree
Hide file tree
Showing 10 changed files with 867 additions and 699 deletions.
2 changes: 1 addition & 1 deletion dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
}

dialCfg := dialCfg{
ipType: cloudsql.PublicIP,
ipType: "PUBLIC",
tcpKeepAlive: defaultTCPKeepAlive,
}
for _, opt := range cfg.dialOpts {
Expand Down
139 changes: 139 additions & 0 deletions internal/alloydb/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alloydb

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"google.golang.org/api/googleapi"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
)

type InstanceGetResponse struct {
ServerResponse googleapi.ServerResponse
Name string `json:"name"`
State string `json:"state"`
IPAddress string `json:"ipAddress"`
}

type GenerateClientCertificateRequest struct {
PemCSR string `json:"pemCsr"`
}

type GenerateClientCertificateResponse struct {
ServerResponse googleapi.ServerResponse
PemCertificate string `json:"pemCertificate"`
PemCertificateChain []string `json:"pemCertificateChain"`
}

type Client struct {
client *http.Client
// endpoint is the base URL for the AlloyDB admin API (e.g.
// https://alloydb.googleapis.com/v1)
endpoint string
}

func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error) {
var os []option.ClientOption
os = append(opts, option.WithScopes(
"https://www.googleapis.com/auth/cloud-platform",
))
client, endpoint, err := htransport.NewClient(ctx, os...)
if err != nil {
return nil, err
}
return &Client{client: client, endpoint: endpoint}, nil
}

func (c *Client) InstanceGet(ctx context.Context, project, region, cluster, instance string) (InstanceGetResponse, error) {
u := fmt.Sprintf(
"%s/v1alpha1/projects/%s/locations/%s/clusters/%s/instances/%s",
c.endpoint, project, region, cluster, instance,
)
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return InstanceGetResponse{}, err
}
res, err := c.client.Do(req)
if err != nil {
return InstanceGetResponse{}, err
}
if res != nil && res.StatusCode == http.StatusNotModified {
var body []byte
if res.Body != nil {
defer res.Body.Close()
body, err = ioutil.ReadAll(res.Body)
if err != nil {
return InstanceGetResponse{}, err
}
}

return InstanceGetResponse{}, &googleapi.Error{
Code: res.StatusCode,
Header: res.Header,
Body: string(body),
}
}
if err != nil {
return InstanceGetResponse{}, err
}
defer res.Body.Close()
ret := InstanceGetResponse{
ServerResponse: googleapi.ServerResponse{
Header: res.Header,
HTTPStatusCode: res.StatusCode,
},
}
if err := json.NewDecoder(res.Body).Decode(&ret); err != nil {
return InstanceGetResponse{}, err
}
return ret, nil
}

func (c *Client) GenerateClientCert(ctx context.Context, project, region, cluster string, csr []byte) (GenerateClientCertificateResponse, error) {
u := fmt.Sprintf(
"%s/v1alpha1/projects/%s/locations/%s/clusters/%s:generateClientCertificate",
c.endpoint, project, region, cluster,
)
body, err := json.Marshal(GenerateClientCertificateRequest{PemCSR: string(csr)})
if err != nil {
return GenerateClientCertificateResponse{}, err
}
req, err := http.NewRequestWithContext(ctx, "POST", u, bytes.NewReader(body))
if err != nil {
return GenerateClientCertificateResponse{}, err
}
res, err := c.client.Do(req.WithContext(ctx))
if err != nil {
return GenerateClientCertificateResponse{}, err
}
defer res.Body.Close()
ret := GenerateClientCertificateResponse{
ServerResponse: googleapi.ServerResponse{
Header: res.Header,
HTTPStatusCode: res.StatusCode,
},
}
if err := json.NewDecoder(res.Body).Decode(&ret); err != nil {
return GenerateClientCertificateResponse{}, err
}
return ret, nil
}
71 changes: 42 additions & 29 deletions internal/cloudsql/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ const (
)

var (
// Instance connection name is the format <PROJECT>:<REGION>:<INSTANCE>
// Instance connection name is the format <PROJECT>:<REGION>:<CLUSTER>:<INSTANCE>
// Additionally, we have to support legacy "domain-scoped" projects (e.g. "google.com:PROJECT")
connNameRegex = regexp.MustCompile("([^:]+(:[^:]+)?):([^:]+):([^:]+)")
connNameRegex = regexp.MustCompile("([^:]+(:[^:]+)?):([^:]+):([^:]+):([^:]+)")
)

// connName represents the "instance connection name", in the format "project:region:name". Use the
// "parseConnName" method to initialize this struct.
type connName struct {
project string
region string
cluster string
name string
}

func (c *connName) String() string {
return fmt.Sprintf("%s:%s:%s", c.project, c.region, c.name)
return fmt.Sprintf("%s:%s:%s:%s", c.project, c.region, c.cluster, c.name)
}

// parseConnName initializes a new connName struct.
Expand All @@ -57,7 +58,7 @@ func parseConnName(cn string) (connName, error) {
m := connNameRegex.FindSubmatch(b)
if m == nil {
err := errtype.NewConfigError(
"invalid instance connection name, expected PROJECT:REGION:INSTANCE",
"invalid instance connection name, expected PROJECT:REGION:CLUSTER:INSTANCE",
cn,
)
return connName{}, err
Expand All @@ -66,14 +67,20 @@ func parseConnName(cn string) (connName, error) {
c := connName{
project: string(m[1]),
region: string(m[3]),
name: string(m[4]),
cluster: string(m[4]),
name: string(m[5]),
}
return c, nil
}

// refreshResult is a pending result of a refresh operation of data used to connect securely. It should
type metadata struct {
ipAddrs map[string]string
version string
}

// refreshOperation is a pending result of a refresh operation of data used to connect securely. It should
// only be initialized by the Instance struct as part of a refresh cycle.
type refreshResult struct {
type refreshOperation struct {
md metadata
tlsCfg *tls.Config
expiry time.Time
Expand All @@ -87,12 +94,12 @@ type refreshResult struct {

// Cancel prevents the instanceInfo from starting, if it hasn't already started. Returns true if timer
// was stopped successfully, or false if it has already started.
func (r *refreshResult) Cancel() bool {
func (r *refreshOperation) Cancel() bool {
return r.timer.Stop()
}

// Wait blocks until the refreshResult attempt is completed.
func (r *refreshResult) Wait(ctx context.Context) error {
// Wait blocks until the refreshOperation attempt is completed.
func (r *refreshOperation) Wait(ctx context.Context) error {
select {
case <-r.ready:
return r.err
Expand All @@ -102,7 +109,7 @@ func (r *refreshResult) Wait(ctx context.Context) error {
}

// IsValid returns true if this result is complete, successful, and is still valid.
func (r *refreshResult) IsValid() bool {
func (r *refreshOperation) IsValid() bool {
// verify the result has finished running
select {
default:
Expand All @@ -124,12 +131,12 @@ type Instance struct {
r refresher

resultGuard sync.RWMutex
// cur represents the current refreshResult that will be used to create connections. If a valid complete
// refreshResult isn't available it's possible for cur to be equal to next.
cur *refreshResult
// next represents a future or ongoing refreshResult. Once complete, it will replace cur and schedule a
// cur represents the current refreshOperation that will be used to create connections. If a valid complete
// refreshOperation isn't available it's possible for cur to be equal to next.
cur *refreshOperation
// next represents a future or ongoing refreshOperation. Once complete, it will replace cur and schedule a
// replacement to occur.
next *refreshResult
next *refreshOperation

// OpenConns is the number of open connections to the instance.
OpenConns uint64
Expand Down Expand Up @@ -157,14 +164,15 @@ func NewInstance(
i := &Instance{
connName: cn,
key: key,
r: newRefresher(
refreshTimeout,
30*time.Second,
2,
client,
ts,
dialerID,
),
// TODO: we'll update this when we do instance
// r: newRefresher(
// refreshTimeout,
// 30*time.Second,
// 2,
// client,
// ts,
// dialerID,
// ),
ctx: ctx,
cancel: cancel,
}
Expand Down Expand Up @@ -226,7 +234,7 @@ func (i *Instance) ForceRefresh() {
}

// result returns the most recent refresh result (waiting for it to complete if necessary)
func (i *Instance) result(ctx context.Context) (*refreshResult, error) {
func (i *Instance) result(ctx context.Context) (*refreshOperation, error) {
i.resultGuard.RLock()
res := i.cur
i.resultGuard.RUnlock()
Expand All @@ -237,13 +245,18 @@ func (i *Instance) result(ctx context.Context) (*refreshResult, error) {
return res, nil
}

// scheduleRefresh schedules a refresh operation to be triggered after a given duration. The returned refreshResult
// scheduleRefresh schedules a refresh operation to be triggered after a given
// duration. The returned refreshOperation
// can be used to either Cancel or Wait for the operations result.
func (i *Instance) scheduleRefresh(d time.Duration) *refreshResult {
res := &refreshResult{}
func (i *Instance) scheduleRefresh(d time.Duration) *refreshOperation {
res := &refreshOperation{}
res.ready = make(chan struct{})
res.timer = time.AfterFunc(d, func() {
res.md, res.tlsCfg, res.expiry, res.err = i.r.performRefresh(i.ctx, i.connName, i.key)
// TODO: fix this
// res.md, res.tlsCfg, res.expiry, res.err = i.r.performRefresh(i.ctx, i.connName, i.key)
r, err := i.r.performRefresh(i.ctx, i.connName, i.key)
_ = r
_ = err
close(res.ready)

// Once the refresh is complete, update "current" with working result and schedule a new refresh
Expand Down
Loading

0 comments on commit d0d6a11

Please sign in to comment.