Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Support AuthorizedView in data and admin client #9515

Merged
merged 10 commits into from
Apr 29, 2024
261 changes: 261 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (ac *AdminClient) backupPath(cluster, instance, backup string) string {
return fmt.Sprintf("projects/%s/instances/%s/clusters/%s/backups/%s", ac.project, instance, cluster, backup)
}

func (ac *AdminClient) authorizedViewPath(table, authorizedView string) string {
return fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), table, authorizedView)
}

// EncryptionInfo represents the encryption info of a table.
type EncryptionInfo struct {
Status *Status
Expand Down Expand Up @@ -881,6 +885,11 @@ func (ac *AdminClient) BackupIAM(cluster, backup string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.backupPath(cluster, ac.instance, backup))
}

// AuthorizedViewIAM creates an IAM Handle specific to a given Table and AuthorizedView.
func (ac *AdminClient) AuthorizedViewIAM(table, authorizedView string) *iam.Handle {
return iam.InternalNewHandleGRPCClient(ac.tClient, ac.authorizedViewPath(table, authorizedView))
}

const instanceAdminAddr = "bigtableadmin.googleapis.com:443"
const mtlsInstanceAdminAddr = "bigtableadmin.mtls.googleapis.com:443"

Expand Down Expand Up @@ -2175,3 +2184,255 @@ func (ac *AdminClient) UpdateBackup(ctx context.Context, cluster, backup string,
_, err := ac.tClient.UpdateBackup(ctx, req)
return err
}

// AuthorizedViewConf contains information about an authorized view.
type AuthorizedViewConf struct {
TableID string
AuthorizedViewID string

AuthorizedViewTypeConf AuthorizedViewTypeConf
DeletionProtection DeletionProtection
}

// AuthorizedViewTypeConf contains information about the type of an authorized view.
type AuthorizedViewTypeConf struct {
AuthorizedViewType AuthorizedViewType

SubsetView *SubsetViewConf
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}

// AuthorizedViewType represents the type of an authorized view.
type AuthorizedViewType int

const (
// AuthorizedViewTypeUnspecified represents an unspecified authorized view type.
AuthorizedViewTypeUnspecified AuthorizedViewType = iota
// AuthorizedViewTypeSubsetView represents subset view type of an authorized view.
AuthorizedViewTypeSubsetView
)

func (av AuthorizedViewConf) proto() *btapb.AuthorizedView {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
var avp btapb.AuthorizedView

switch dp := av.DeletionProtection; dp {
case Protected:
avp.DeletionProtection = true
case Unprotected:
avp.DeletionProtection = false
default:
break
}

switch avt := av.AuthorizedViewTypeConf.AuthorizedViewType; avt {
case AuthorizedViewTypeSubsetView:
avp.AuthorizedView = &btapb.AuthorizedView_SubsetView_{
SubsetView: av.AuthorizedViewTypeConf.SubsetView.proto(),
}
default:
break
}
return &avp
}

// SetSubsetView sets the AuthorizedViewType to AuthorizedViewTypeSubsetView and the subsetView pointer accordingly
func (av *AuthorizedViewTypeConf) SetSubsetView(s SubsetViewConf) error {
av.AuthorizedViewType = AuthorizedViewTypeSubsetView
av.SubsetView = &s
return nil
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

// GetSubsetView returns an error if the type is not AuthorizedViewTypeSubsetView.
func (av *AuthorizedViewTypeConf) GetSubsetView() (*SubsetViewConf, error) {
if av.AuthorizedViewType != AuthorizedViewTypeSubsetView {
return nil, fmt.Errorf("not a subset view: :%v", av.AuthorizedViewType)
}
return av.SubsetView, nil
}

// FamilySubset represents a subset of a column family.
type FamilySubset struct {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
Qualifiers [][]byte
QualifierPrefixes [][]byte
}

// SubsetViewConf contains configuration specific to an authorized view of subset view type.
type SubsetViewConf struct {
RowPrefixes [][]byte
FamilySubsets map[string]FamilySubset
}

// AddRowPrefix adds a new row prefix to the subset view.
func (s *SubsetViewConf) AddRowPrefix(prefix []byte) {
s.RowPrefixes = append(s.RowPrefixes, prefix)
}

func (s *SubsetViewConf) getFamilySubset(familyName string) FamilySubset {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
if _, ok := s.FamilySubsets[familyName]; !ok {
s.FamilySubsets[familyName] = FamilySubset{}
}
return s.FamilySubsets[familyName]
}

func (s SubsetViewConf) proto() *btapb.AuthorizedView_SubsetView {
var p btapb.AuthorizedView_SubsetView
p.RowPrefixes = append(p.RowPrefixes, s.RowPrefixes...)
if p.FamilySubsets == nil {
p.FamilySubsets = make(map[string]*btapb.AuthorizedView_FamilySubsets)
}
for familyName, subset := range s.FamilySubsets {
p.FamilySubsets[familyName] = &btapb.AuthorizedView_FamilySubsets{
Qualifiers: subset.Qualifiers,
QualifierPrefixes: subset.QualifierPrefixes,
}
}
return &p
}

func (s *SubsetViewConf) fillConf(internal *btapb.AuthorizedView_SubsetView) {
s.RowPrefixes = [][]byte{}
s.RowPrefixes = append(s.RowPrefixes, internal.RowPrefixes...)
if s.FamilySubsets == nil {
s.FamilySubsets = make(map[string]FamilySubset)
}
for k, v := range internal.FamilySubsets {
s.FamilySubsets[k] = FamilySubset{
Qualifiers: v.Qualifiers,
QualifierPrefixes: v.QualifierPrefixes,
}
}
}

// AddFamilySubsetQualifier adds an individual column qualifier to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifier(familyName string, qualifier []byte) {
fs := s.getFamilySubset(familyName)
fs.Qualifiers = append(fs.Qualifiers, qualifier)
s.FamilySubsets[familyName] = fs
}

// AddFamilySubsetQualifierPrefix adds a prefix for column qualifiers to be included in a subset view.
func (s *SubsetViewConf) AddFamilySubsetQualifierPrefix(familyName string, qualifierPrefix []byte) {
fs := s.getFamilySubset(familyName)
fs.QualifierPrefixes = append(fs.QualifierPrefixes, qualifierPrefix)
s.FamilySubsets[familyName] = fs
}

// CreateAuthorizedView creates a new authorized view in a table.
func (ac *AdminClient) CreateAuthorizedView(ctx context.Context, conf *AuthorizedViewConf) error {
if conf.TableID == "" || conf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID are required")
}
if conf.AuthorizedViewTypeConf.AuthorizedViewType == AuthorizedViewTypeUnspecified {
return errors.New("AuthorizedViewType must be specified")
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.CreateAuthorizedViewRequest{
Parent: fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), conf.TableID),
AuthorizedViewId: conf.AuthorizedViewID,
AuthorizedView: conf.proto(),
}
_, err := ac.tClient.CreateAuthorizedView(ctx, req)
return err
}

// GetAuthorizedView retrieves information about an authorized view.
func (ac *AdminClient) GetAuthorizedView(ctx context.Context, tableID, authorizedViewID string) (*AuthorizedViewConf, error) {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.GetAuthorizedViewRequest{
Name: fmt.Sprintf("%s/tables/%s/authorizedViews/%s", ac.instancePrefix(), tableID, authorizedViewID),
}
var res *btapb.AuthorizedView
av := &AuthorizedViewConf{TableID: tableID, AuthorizedViewID: authorizedViewID}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.GetAuthorizedView(ctx, req)
return err
}, retryOptions...)

if err != nil {
return nil, err
}

if res.DeletionProtection {
av.DeletionProtection = Protected
} else {
av.DeletionProtection = Unprotected
}
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
if res.GetSubsetView() != nil {
s := SubsetViewConf{}
s.fillConf(res.GetSubsetView())
av.AuthorizedViewTypeConf.SetSubsetView(s)
}
return av, nil
}

// AuthorizedViews returns a list of the authorized views in the table. The names returned are fully
// qualified as projects/<project>/instances/<instance>/tables/<table>/authorizedViews/<authorizedView>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
func (ac *AdminClient) AuthorizedViews(ctx context.Context, tableID string) ([]string, error) {
names := []string{}

req := &btapb.ListAuthorizedViewsRequest{
Parent: fmt.Sprintf("%s/tables/%s", ac.instancePrefix(), tableID),
View: btapb.AuthorizedView_NAME_ONLY,
}
var res *btapb.ListAuthorizedViewsResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = ac.tClient.ListAuthorizedViews(ctx, req)
return err
}, retryOptions...)
if err != nil {
return nil, err
}

for _, av := range res.AuthorizedViews {
names = append(names, av.Name)
}
return names, nil
}

// UpdateAuthorizedViewConf contains all the information necessary to update or partial update an authorized view.
type UpdateAuthorizedViewConf struct {
AuthorizedViewConf AuthorizedViewConf
UpdateMask []string
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
IgnoreWarnings bool
}

// UpdateAuthorizedView updates an authorized view in a table according to the given configuration.
func (ac *AdminClient) UpdateAuthorizedView(ctx context.Context, conf UpdateAuthorizedViewConf) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
if conf.AuthorizedViewConf.TableID == "" || conf.AuthorizedViewConf.AuthorizedViewID == "" {
return errors.New("both AuthorizedViewID and TableID is required")
}
av := conf.AuthorizedViewConf.proto()
av.Name = ac.authorizedViewPath(conf.AuthorizedViewConf.TableID, conf.AuthorizedViewConf.AuthorizedViewID)
req := &btapb.UpdateAuthorizedViewRequest{
AuthorizedView: av,
UpdateMask: &field_mask.FieldMask{Paths: conf.UpdateMask},
IgnoreWarnings: conf.IgnoreWarnings,
}
lro, err := ac.tClient.UpdateAuthorizedView(ctx, req)
if err != nil {
return fmt.Errorf("error from update authorized view: %w", err)
}
var res btapb.AuthorizedView
op := longrunning.InternalNewOperation(ac.lroClient, lro)
if err = op.Wait(ctx, &res); err != nil {
return fmt.Errorf("error from operation: %v", err)
}
return nil
}

// DeleteAuthorizedView deletes an authorized view in a table.
func (ac *AdminClient) DeleteAuthorizedView(ctx context.Context, tableID, authorizedViewID string) error {
ctx = mergeOutgoingMetadata(ctx, ac.md)
req := &btapb.DeleteAuthorizedViewRequest{
Name: ac.authorizedViewPath(tableID, authorizedViewID),
}
_, err := ac.tClient.DeleteAuthorizedView(ctx, req)
return err
}
Loading
Loading