Skip to content

Commit

Permalink
DDO-1161 Support for regional PDs (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
choover-broad committed Apr 6, 2021
1 parent 7d64099 commit f2bb34a
Show file tree
Hide file tree
Showing 8 changed files with 570 additions and 212 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ENV CGO_ENABLED=0
ENV GO111MODULE=on
ENV GOBIN=/bin
COPY . .
RUN go build -o /bin/disk-manager .
RUN go test ./... && go build -o /bin/disk-manager .

FROM alpine:${ALPINE_VERSION} as runtime
COPY --from=build /bin/disk-manager /bin/disk-manager
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ When running the docker image locally the `-local` runtime flag must be used. Th
using your local `.kube/config` otherwise in cluster authentication will be used. Your local `.kubconfig` and a GCP credential must be mounted to the
container when running locally.

Unit tests can be run with `go test`:

```
# Run tests w/ coverage stats
go test -coverprofile=coverage.out
# View line-by-line coverage report in browser
go tool cover -html=coverage.out
```

### Runtime flags

```
Expand Down
13 changes: 4 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
Expand Down Expand Up @@ -32,7 +31,7 @@ func (c *Clients) GetK8s() *kubernetes.Clientset {

// Build creates the GCP and k8s clients used by this tool
// and returns both packaged in a single struct
func Build(local bool, kubeconfig *string) (*Clients, error) {
func Build(local bool, kubeconfig string) (*Clients, error) {
conf, err := buildKubeConfig(local, kubeconfig)
if err != nil {
return nil, fmt.Errorf("Error building kube client: %v", err)
Expand All @@ -52,9 +51,9 @@ func Build(local bool, kubeconfig *string) (*Clients, error) {
}, nil
}

func buildKubeConfig(local bool, kubeconfig *string) (*restclient.Config, error) {
func buildKubeConfig(local bool, kubeconfig string) (*restclient.Config, error) {
if local {
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("Error building local k8s config: %v", err)
}
Expand All @@ -73,12 +72,8 @@ func buildKubeClient(config *restclient.Config) (*kubernetes.Clientset, error) {

func buildGCPClient() (*compute.Service, error) {
ctx := context.Background()
gcpClient, err := google.DefaultClient(ctx, compute.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("error authenticating to GCP: %v", err)
}

c, err := compute.New(gcpClient)
c, err := compute.NewService(ctx)
if err != nil {
return nil, fmt.Errorf("error creating compute api client: %v", err)
}
Expand Down
192 changes: 192 additions & 0 deletions disk/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package disk

import (
"fmt"
"github.com/broadinstitute/disk-manager/client"
"github.com/broadinstitute/disk-manager/config"
"github.com/broadinstitute/disk-manager/logs"
"google.golang.org/api/compute/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type DiskManager struct {
config *config.Config // DiskManager config
gcp *compute.Service // GCP Compute API client
k8s kubernetes.Interface // K8s API client
}

type diskInfo struct {
name string
policy string
}

/* Construct a new DiskManager */
func NewDiskManager(cfg *config.Config, clients *client.Clients) (*DiskManager, error) {
k8s := clients.GetK8s()
gcp := clients.GetGCP()

return &DiskManager{cfg, gcp, k8s}, nil
}

/*
* Main method for disk manager.
* Add snapshot policies to all persistent disks with the configured annotation.
*/
func (m *DiskManager) Run() error {
disks, err := m.searchForDisks()
if err != nil {
return fmt.Errorf("Error retrieving persistent disks: %v\n", err)
}

return m.addPoliciesToDisks(disks)
}

/* Search K8s for PersistentVolumeClaims with the snapshot policy annotation */
func (m *DiskManager) searchForDisks() ([]diskInfo, error) {
disks := make([]diskInfo, 0)

logs.Info.Println("Searching GKE for persistent disks...")

// get persistent volume claims
pvcs, err := m.k8s.CoreV1().PersistentVolumeClaims("").List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("Error retrieving persistent volume claims: %v\n", err)
}
for _, pvc := range pvcs.Items {
if policy, ok := pvc.Annotations[m.config.TargetAnnotation]; ok {
// retrieve associated persistent volume for each claim
pv, err := m.k8s.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Error retrieving persistent volume: %s, %v\n", pvc.Spec.VolumeName, err)
}
diskName := pv.Spec.GCEPersistentDisk.PDName
logs.Info.Printf("found PersistentVolume: %q with disk: %q", pvc.GetName(), diskName)
disk := diskInfo{
name: diskName,
policy: policy,
}
disks = append(disks, disk)
}
}

return disks, nil
}

/* Add snapshot policies to disks */
func (m *DiskManager) addPoliciesToDisks(disks []diskInfo) error {
errs := 0
for _, disk := range disks {
if err := m.addPolicy(disk); err != nil {
logs.Error.Printf("Error adding policy %s to disk %s: %v\n", disk.policy, disk.name, err)
errs++
}
}

if errs > 0 {
return fmt.Errorf("Encountered %d error(s) adding snapshot policies to disks\n", errs)
}

logs.Info.Println("Finished updating snapshot policies")

return nil
}

/* Add the configured resource policy to the target disk */
func (m *DiskManager) addPolicy(info diskInfo) error {
// TODO only perform this api call if policyName is different
policy, err := m.getPolicy(info.policy)
if err != nil {
return fmt.Errorf("Error retrieving snapshot policy %s for disk %s: %v\n", info.policy, info.name, err)
}

disk, err := m.findDisk(info.name)

if err != nil {
return err
}

// Check to see if any policies are already attached
if len(disk.ResourcePolicies) > 1 {
return fmt.Errorf("Disk %s has more than one resource policy, did the GCP API change? %v\n", info.name, disk.ResourcePolicies)
}
if len(disk.ResourcePolicies) == 1 {
if disk.ResourcePolicies[0] == policy.SelfLink {
logs.Info.Printf("Policy %s is already attached to disk %s, nothing to do\n", info.policy, info.name)
return nil
} else {
return fmt.Errorf("Unexpected policy %s is already attached to disk %s, please detach it manually and re-run\n", disk.ResourcePolicies[0], info.name)
}
}

// Attach policy
err = nil

if disk.Zone != "" { // zonal disk
err = m.addPolicyToZonalDisk(info.name, policy)
} else {
err = m.addPolicyToRegionalDisk(info.name, policy)
}
if err != nil {
return fmt.Errorf("Error adding snapshot policy %s to disk %s: %v\n", info.policy, info.name, err)
}

logs.Info.Printf("Added policy %s to disk %s\n", info.policy, info.name)
return nil
}

/* Retrieve a regional or zonal disk object via the GCP API.
Returns the disk, and an error. Callers can determine whether the disk is regional or zonal by
checking the Zone attribute (empty for regional disk) or Region attribute (empty for zonal disk).
*/
func (m *DiskManager) findDisk(name string) (*compute.Disk, error) {
disk, err1 := m.getZonalDisk(name)
if err1 == nil {
logs.Info.Printf("Found disk %s in zone %s\n", name, m.config.Zone)
return disk, nil
}

disk, err2 := m.getRegionalDisk(name)
if err2 == nil {
logs.Info.Printf("Found disk %s in region %s", name, m.config.Region)
return disk, nil
}

logs.Error.Printf("Could not find disk %s in zone %s: %v\n", name, m.config.Zone, err1)
logs.Error.Printf("Could not find disk %s in region %s: %v\n", name, m.config.Region, err2)

return nil, fmt.Errorf("Could not find disk %s in configured region or zone\n", name)
}

/* Retrieve a resource policy object via the GCP API */
func (m *DiskManager) getPolicy(name string) (*compute.ResourcePolicy, error) {
return m.gcp.ResourcePolicies.Get(m.config.GoogleProject, m.config.Region, name).Do()
}

/* Retrieve a zonal disk object via the GCP API */
func (m *DiskManager) getZonalDisk(name string) (*compute.Disk, error) {
return m.gcp.Disks.Get(m.config.GoogleProject, m.config.Zone, name).Do()
}

/* Retrieve a regional disk object via the GCP API */
func (m *DiskManager) getRegionalDisk(name string) (*compute.Disk, error) {
return m.gcp.RegionDisks.Get(m.config.GoogleProject, m.config.Region, name).Do()
}

/* Attach a policy to a zonal disk object via the GCP API */
func (m *DiskManager) addPolicyToZonalDisk(diskName string, policy *compute.ResourcePolicy) error {
addPolicyRequest := &compute.DisksAddResourcePoliciesRequest{
ResourcePolicies: []string{policy.SelfLink},
}
_, err := m.gcp.Disks.AddResourcePolicies(m.config.GoogleProject, m.config.Zone, diskName, addPolicyRequest).Do()
return err
}

/* Attach a policy to a regional disk object via the GCP API */
func (m *DiskManager) addPolicyToRegionalDisk(diskName string, policy *compute.ResourcePolicy) error {
addPolicyRequest := &compute.RegionDisksAddResourcePoliciesRequest{
ResourcePolicies: []string{policy.SelfLink},
}
_, err := m.gcp.RegionDisks.AddResourcePolicies(m.config.GoogleProject, m.config.Region, diskName, addPolicyRequest).Do()
return err
}
Loading

0 comments on commit f2bb34a

Please sign in to comment.