forked from hashicorp/consul
/
delete.go
162 lines (144 loc) · 5.51 KB
/
delete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package resource
import (
"context"
"errors"
"fmt"
"time"
"github.com/oklog/ulid/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hernad/consul/acl"
"github.com/hernad/consul/internal/resource"
"github.com/hernad/consul/internal/storage"
"github.com/hernad/consul/proto-public/pbresource"
)
// Deletes a resource.
// - To delete a resource regardless of the stored version, set Version = ""
// - Supports deleting a resource by name, hence Id.Uid may be empty.
// - Delete of a previously deleted or non-existent resource is a no-op to support idempotency.
// - Errors with Aborted if the requested Version does not match the stored Version.
// - Errors with PermissionDenied if ACL check fails
func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pbresource.DeleteResponse, error) {
if err := validateDeleteRequest(req); err != nil {
return nil, err
}
reg, err := s.resolveType(req.Id.Type)
if err != nil {
return nil, err
}
authz, err := s.getAuthorizer(tokenFromContext(ctx))
if err != nil {
return nil, err
}
err = reg.ACLs.Write(authz, req.Id)
switch {
case acl.IsErrPermissionDenied(err):
return nil, status.Error(codes.PermissionDenied, err.Error())
case err != nil:
return nil, status.Errorf(codes.Internal, "failed write acl: %v", err)
}
// The storage backend requires a Version and Uid to delete a resource based
// on CAS semantics. When either are not provided, the resource must be read
// with a strongly consistent read to retrieve either or both.
//
// n.b.: There is a chance DeleteCAS may fail with a storage.ErrCASFailure
// if an update occurs between the Read and DeleteCAS. Consider refactoring
// to use retryCAS() similar to the Write endpoint to close this gap.
deleteVersion := req.Version
deleteId := req.Id
if deleteVersion == "" || deleteId.Uid == "" {
existing, err := s.Backend.Read(ctx, storage.StrongConsistency, req.Id)
switch {
case err == nil:
deleteVersion = existing.Version
deleteId = existing.Id
case errors.Is(err, storage.ErrNotFound):
// Deletes are idempotent so no-op when not found
return &pbresource.DeleteResponse{}, nil
default:
return nil, status.Errorf(codes.Internal, "failed read: %v", err)
}
}
if err := s.maybeCreateTombstone(ctx, deleteId); err != nil {
return nil, err
}
err = s.Backend.DeleteCAS(ctx, deleteId, deleteVersion)
switch {
case err == nil:
return &pbresource.DeleteResponse{}, nil
case errors.Is(err, storage.ErrCASFailure):
return nil, status.Error(codes.Aborted, err.Error())
default:
return nil, status.Errorf(codes.Internal, "failed delete: %v", err)
}
}
// Create a tombstone to capture the intent to delete child resources.
// Tombstones are created preemptively to prevent partial failures even though
// we are currently unaware of the success/failure/no-op of DeleteCAS. In
// the failure and no-op cases the tombstone is effectively a no-op and will
// still be deleted from the system by the reaper controller.
func (s *Server) maybeCreateTombstone(ctx context.Context, deleteId *pbresource.ID) error {
// Don't create a tombstone when the resource being deleted is itself a tombstone.
if resource.EqualType(resource.TypeV1Tombstone, deleteId.Type) {
return nil
}
data, err := anypb.New(&pbresource.Tombstone{Owner: deleteId})
if err != nil {
return status.Errorf(codes.Internal, "failed creating tombstone: %v", err)
}
// Since a tombstone is an internal resource type that should not be visible
// or accessible by users, we're writing to the backend directly instead of
// using the resource service's Write endpoint. This bypasses resource level
// concerns that are either not relevant (valiation and mutation hooks) or
// futher complicate the implementation (user provided tokens having
// awareness of tombstone ACLs).
//
// ErrCASFailure should never happen since an empty Version is always passed.
//
// TODO(spatel): Probably a good idea to block writes of TypeV1Tombstone
// on the ResourceService.Write() endpoint to lock things down?
_, err = s.Backend.WriteCAS(ctx, &pbresource.Resource{
Id: &pbresource.ID{
Type: resource.TypeV1Tombstone,
Tenancy: deleteId.Tenancy,
Name: tombstoneName(deleteId),
Uid: ulid.Make().String(),
},
Generation: ulid.Make().String(),
Data: data,
Metadata: map[string]string{
"generated_at": time.Now().Format(time.RFC3339),
},
})
switch {
case err == nil:
// Success!
return nil
case errors.Is(err, storage.ErrWrongUid):
// Backend has detected that we're trying to change the Uid for an
// existing tombstone (probably created from a previously failed Delete
// where the tombstone WriteCAS succeeded but the resource DeleteCAS
// failed). The fact that the tombstone already exists means we're good.
return nil
default:
return status.Errorf(codes.Internal, "failed writing tombstone: %v", err)
}
}
func validateDeleteRequest(req *pbresource.DeleteRequest) error {
if req.Id == nil {
return status.Errorf(codes.InvalidArgument, "id is required")
}
if err := validateId(req.Id, "id"); err != nil {
return err
}
return nil
}
// Maintains a deterministic mapping between a resource and it's tombstone's
// name by embedding the resources's Uid in the name.
func tombstoneName(deleteId *pbresource.ID) string {
// deleteId.Name is just included for easier identification
return fmt.Sprintf("tombstone-%v-%v", deleteId.Name, deleteId.Uid)
}