Skip to content

Commit

Permalink
Allow the BlobAccess configuration to be DAG shaped
Browse files Browse the repository at this point in the history
Right now the configuration can be only be tree shaped. This can be
somewhat limiting at times. For example, if you want to place
DemultiplexingBlobAccess at the top level, but want to have multiple
arms matching the same backend, there is no way to do that without
repetition. Some backends can also not be duplicated safely (e.g.,
LocalBlobAccess), as that would cause them to be instantiated multiple
times.

This change solves that by adding a new 'with_labels' and 'labels'
backends to BlobAccessConfiguration. These can be used to introduce
additional BlobAccess instances under a custom label. The label can then
be referenced from within the main definition.
  • Loading branch information
EdSchouten committed Jul 15, 2022
1 parent 3c4db2c commit cc295ad
Show file tree
Hide file tree
Showing 9 changed files with 766 additions and 574 deletions.
11 changes: 4 additions & 7 deletions pkg/blobstore/configuration/ac_blob_access_creator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package configuration

import (
"context"

remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/completenesschecking"
Expand All @@ -14,7 +12,6 @@ import (
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"
"github.com/buildbarn/bb-storage/pkg/util"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -60,10 +57,10 @@ func (bac *acBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.Pr
return acCapabilitiesProvider
}

func (bac *acBlobAccessCreator) NewCustomBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration) (BlobAccessInfo, string, error) {
func (bac *acBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) {
switch backend := configuration.Backend.(type) {
case *pb.BlobAccessConfiguration_ActionResultExpiring:
base, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ActionResultExpiring.Backend, bac)
base, err := nestedCreator.NewNestedBlobAccess(backend.ActionResultExpiring.Backend, bac)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand All @@ -88,7 +85,7 @@ func (bac *acBlobAccessCreator) NewCustomBlobAccess(terminationContext context.C
if bac.contentAddressableStorage == nil {
return BlobAccessInfo{}, "", status.Error(codes.InvalidArgument, "Action Cache completeness checking can only be enabled if a Content Addressable Storage is configured")
}
base, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.CompletenessChecking, bac)
base, err := nestedCreator.NewNestedBlobAccess(backend.CompletenessChecking, bac)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand All @@ -110,7 +107,7 @@ func (bac *acBlobAccessCreator) NewCustomBlobAccess(terminationContext context.C
DigestKeyFormat: digest.KeyWithInstance,
}, "grpc", nil
default:
return newProtoCustomBlobAccess(terminationContext, terminationGroup, configuration, bac)
return newProtoCustomBlobAccess(configuration, nestedCreator, bac)
}
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/blobstore/configuration/blob_access_creator.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package configuration

import (
"context"
"sync"

"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/local"
"github.com/buildbarn/bb-storage/pkg/capabilities"
"github.com/buildbarn/bb-storage/pkg/digest"
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"

"golang.org/x/sync/errgroup"
)

// NestedBlobAccessCreator is a helper type that implementations of
// BlobAccessCreator may use to construct nested instances of
// BlobAccess. For example, ACBlobAccessCreator will call into this
// interface to create the backend of CompletenessCheckingBlobAccess.
type NestedBlobAccessCreator interface {
NewNestedBlobAccess(configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, error)
}

// BlobAccessCreator contains a set of methods that are invoked by the
// generic NewBlobAccessFromConfiguration() function to create a
// BlobAccess of a specific kind (e.g., Action Cache, Content
Expand Down Expand Up @@ -50,7 +55,7 @@ type BlobAccessCreator interface {
// BlobAccess instances that only apply to this storage type.
// For example, CompletenessCheckingBlobAccess is only
// applicable to the Action Cache.
NewCustomBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration) (BlobAccessInfo, string, error)
NewCustomBlobAccess(configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error)
// WrapTopLevelBlobAccess() is called at the very end of
// NewBlobAccessFromConfiguration() to apply any top-level
// decorators.
Expand Down
10 changes: 3 additions & 7 deletions pkg/blobstore/configuration/cas_blob_access_creator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package configuration

import (
"context"
"net/http"
"sync"

Expand All @@ -19,7 +18,6 @@ import (
"github.com/buildbarn/bb-storage/pkg/util"
"github.com/google/uuid"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -74,10 +72,10 @@ func (bac *casBlobAccessCreator) NewHierarchicalInstanceNamesLocalBlobAccess(key
return local.NewHierarchicalCASBlobAccess(keyLocationMap, locationBlobMap, globalLock, casCapabilitiesProvider), nil
}

func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration) (BlobAccessInfo, string, error) {
func (bac *casBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) {
switch backend := configuration.Backend.(type) {
case *pb.BlobAccessConfiguration_ExistenceCaching:
base, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ExistenceCaching.Backend, bac)
base, err := nestedCreator.NewNestedBlobAccess(backend.ExistenceCaching.Backend, bac)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -107,9 +105,7 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(terminationContext context.
// location of a blob, not the blobs themselves. Create
// a new BlobAccessCreator to ensure data is loaded
// properly.
base, err := NewNestedBlobAccess(
terminationContext,
terminationGroup,
base, err := nestedCreator.NewNestedBlobAccess(
backend.ReferenceExpanding.IndirectContentAddressableStorage,
NewICASBlobAccessCreator(
bac.grpcClientFactory,
Expand Down
8 changes: 2 additions & 6 deletions pkg/blobstore/configuration/icas_blob_access_creator.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package configuration

import (
"context"

"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/grpcclients"
"github.com/buildbarn/bb-storage/pkg/capabilities"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/grpc"
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"

"golang.org/x/sync/errgroup"
)

type icasBlobAccessCreator struct {
Expand Down Expand Up @@ -43,7 +39,7 @@ func (bac *icasBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.
return nil
}

func (bac *icasBlobAccessCreator) NewCustomBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration) (BlobAccessInfo, string, error) {
func (bac *icasBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) {
switch backend := configuration.Backend.(type) {
case *pb.BlobAccessConfiguration_Grpc:
client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc)
Expand All @@ -57,7 +53,7 @@ func (bac *icasBlobAccessCreator) NewCustomBlobAccess(terminationContext context
DigestKeyFormat: digest.KeyWithInstance,
}, "grpc", nil
default:
return newProtoCustomBlobAccess(terminationContext, terminationGroup, configuration, bac)
return newProtoCustomBlobAccess(configuration, nestedCreator, bac)
}
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/blobstore/configuration/iscc_blob_access_creator.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package configuration

import (
"context"

"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/grpcclients"
"github.com/buildbarn/bb-storage/pkg/capabilities"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/grpc"
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/blobstore"

"golang.org/x/sync/errgroup"
)

type isccBlobAccessCreator struct {
Expand Down Expand Up @@ -43,7 +39,7 @@ func (bac *isccBlobAccessCreator) GetDefaultCapabilitiesProvider() capabilities.
return nil
}

func (bac *isccBlobAccessCreator) NewCustomBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration) (BlobAccessInfo, string, error) {
func (bac *isccBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAccessConfiguration, nestedCreator NestedBlobAccessCreator) (BlobAccessInfo, string, error) {
switch backend := configuration.Backend.(type) {
case *pb.BlobAccessConfiguration_Grpc:
client, err := bac.grpcClientFactory.NewClientFromConfiguration(backend.Grpc)
Expand All @@ -55,7 +51,7 @@ func (bac *isccBlobAccessCreator) NewCustomBlobAccess(terminationContext context
DigestKeyFormat: digest.KeyWithInstance,
}, "grpc", nil
default:
return newProtoCustomBlobAccess(terminationContext, terminationGroup, configuration, bac)
return newProtoCustomBlobAccess(configuration, nestedCreator, bac)
}
}

Expand Down
88 changes: 69 additions & 19 deletions pkg/blobstore/configuration/new_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ func newCachedReadBufferFactory(cacheConfiguration *digest_pb.ExistenceCacheConf
dataIntegrityCheckingCache), nil
}

func newNestedBlobAccessBare(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, string, error) {
type simpleNestedBlobAccessCreator struct {
terminationContext context.Context
terminationGroup *errgroup.Group
labels map[string]BlobAccessInfo
}

func (nc *simpleNestedBlobAccessCreator) newNestedBlobAccessBare(configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, string, error) {
readBufferFactory := creator.GetReadBufferFactory()
storageTypeName := creator.GetStorageTypeName()
switch backend := configuration.Backend.(type) {
Expand All @@ -71,11 +77,11 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
DigestKeyFormat: digest.KeyWithoutInstance,
}, "error", nil
case *pb.BlobAccessConfiguration_ReadCaching:
slow, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ReadCaching.Slow, creator)
slow, err := nc.NewNestedBlobAccess(backend.ReadCaching.Slow, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
fast, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ReadCaching.Fast, creator)
fast, err := nc.NewNestedBlobAccess(backend.ReadCaching.Fast, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -216,7 +222,7 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
backends = append(backends, nil)
} else {
// Undrained backend.
backend, err := NewNestedBlobAccess(terminationContext, terminationGroup, shard.Backend, creator)
backend, err := nc.NewNestedBlobAccess(shard.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -245,11 +251,11 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
DigestKeyFormat: *combinedDigestKeyFormat,
}, "sharding", nil
case *pb.BlobAccessConfiguration_SizeDistinguishing:
small, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.SizeDistinguishing.Small, creator)
small, err := nc.NewNestedBlobAccess(backend.SizeDistinguishing.Small, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
large, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.SizeDistinguishing.Large, creator)
large, err := nc.NewNestedBlobAccess(backend.SizeDistinguishing.Large, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand All @@ -258,11 +264,11 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
DigestKeyFormat: small.DigestKeyFormat.Combine(large.DigestKeyFormat),
}, "size_distinguishing", nil
case *pb.BlobAccessConfiguration_Mirrored:
backendA, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.Mirrored.BackendA, creator)
backendA, err := nc.NewNestedBlobAccess(backend.Mirrored.BackendA, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
backendB, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.Mirrored.BackendB, creator)
backendB, err := nc.NewNestedBlobAccess(backend.Mirrored.BackendB, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -399,7 +405,8 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
periodicSyncer.ProcessBlockRelease()
}
}()
terminationGroup.Go(func() error {
terminationContext := nc.terminationContext
nc.terminationGroup.Go(func() error {
for periodicSyncer.ProcessBlockPut(terminationContext) {
}
// TODO: Let PeriodicSyncer propagate errors
Expand Down Expand Up @@ -487,11 +494,11 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
DigestKeyFormat: digestKeyFormat,
}, backendType, nil
case *pb.BlobAccessConfiguration_ReadFallback:
primary, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ReadFallback.Primary, creator)
primary, err := nc.NewNestedBlobAccess(backend.ReadFallback.Primary, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
secondary, err := NewNestedBlobAccess(terminationContext, terminationGroup, backend.ReadFallback.Secondary, creator)
secondary, err := nc.NewNestedBlobAccess(backend.ReadFallback.Secondary, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -522,7 +529,7 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
if err != nil {
return BlobAccessInfo{}, "", util.StatusWrapf(err, "Invalid instance name %#v", demultiplexed.AddInstanceNamePrefix)
}
backend, err := NewNestedBlobAccess(terminationContext, terminationGroup, demultiplexed.Backend, creator)
backend, err := nc.NewNestedBlobAccess(demultiplexed.Backend, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand All @@ -546,11 +553,11 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
}, "demultiplexing", nil
case *pb.BlobAccessConfiguration_ReadCanarying:
config := backend.ReadCanarying
source, err := NewNestedBlobAccess(terminationContext, terminationGroup, config.Source, creator)
source, err := nc.NewNestedBlobAccess(config.Source, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
replica, err := NewNestedBlobAccess(terminationContext, terminationGroup, config.Replica, creator)
replica, err := nc.NewNestedBlobAccess(config.Replica, creator)
if err != nil {
return BlobAccessInfo{}, "", err
}
Expand Down Expand Up @@ -617,7 +624,8 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
file)

// Ensure the central directory is written upon termination.
terminationGroup.Go(func() error {
terminationContext := nc.terminationContext
nc.terminationGroup.Go(func() error {
<-terminationContext.Done()
if err := blobAccess.Finalize(); err != nil {
return util.StatusWrapf(err, "Failed to finalize ZIP archive %#v", zipPath)
Expand All @@ -633,18 +641,56 @@ func newNestedBlobAccessBare(terminationContext context.Context, terminationGrou
DigestKeyFormat: digestKeyFormat,
}, "zip_writing", nil
}
return creator.NewCustomBlobAccess(terminationContext, terminationGroup, configuration)
return creator.NewCustomBlobAccess(configuration, nc)
}

// NewNestedBlobAccess may be called by
// BlobAccessCreator.NewCustomBlobAccess() to create BlobAccess
// objects for instances nested inside the configuration.
func NewNestedBlobAccess(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, error) {
func (nc *simpleNestedBlobAccessCreator) NewNestedBlobAccess(configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, error) {
if configuration == nil {
return BlobAccessInfo{}, status.Error(codes.InvalidArgument, "Storage configuration not specified")
}

backend, backendType, err := newNestedBlobAccessBare(terminationContext, terminationGroup, configuration, creator)
// Protobuf does not support anchors/aliases like YAML. Have
// separate 'with_labels' and 'labels' backends that can be used
// to declare anchors and aliases, respectively.
switch backend := configuration.Backend.(type) {
case *pb.BlobAccessConfiguration_WithLabels:
config := backend.WithLabels

// Inherit labels from the parent.
labels := map[string]BlobAccessInfo{}
for label, labelBackend := range nc.labels {
labels[label] = labelBackend
}

// Add additional labels declared in config.
for label, labelBackend := range config.Labels {
if _, ok := labels[label]; ok {
// Disallow shadowing.
return BlobAccessInfo{}, status.Errorf(codes.InvalidArgument, "Label %#v has already been declared", label)
}
info, err := nc.NewNestedBlobAccess(labelBackend, creator)
if err != nil {
return BlobAccessInfo{}, util.StatusWrapf(err, "Label %#v", label)
}
labels[label] = info
}

return (&simpleNestedBlobAccessCreator{
terminationContext: nc.terminationContext,
terminationGroup: nc.terminationGroup,
labels: labels,
}).NewNestedBlobAccess(config.Backend, creator)
case *pb.BlobAccessConfiguration_Label:
if labelBackend, ok := nc.labels[backend.Label]; ok {
return labelBackend, nil
}
return BlobAccessInfo{}, status.Errorf(codes.InvalidArgument, "Label %#v not declared", backend.Label)
}

backend, backendType, err := nc.newNestedBlobAccessBare(configuration, creator)
if err != nil {
return BlobAccessInfo{}, err
}
Expand All @@ -657,7 +703,11 @@ func NewNestedBlobAccess(terminationContext context.Context, terminationGroup *e
// NewBlobAccessFromConfiguration creates a BlobAccess object based on a
// configuration file.
func NewBlobAccessFromConfiguration(terminationContext context.Context, terminationGroup *errgroup.Group, configuration *pb.BlobAccessConfiguration, creator BlobAccessCreator) (BlobAccessInfo, error) {
backend, err := NewNestedBlobAccess(terminationContext, terminationGroup, configuration, creator)
nestedCreator := &simpleNestedBlobAccessCreator{
terminationContext: terminationContext,
terminationGroup: terminationGroup,
}
backend, err := nestedCreator.NewNestedBlobAccess(configuration, creator)
if err != nil {
return BlobAccessInfo{}, err
}
Expand Down

0 comments on commit cc295ad

Please sign in to comment.