/
mutate.go
113 lines (95 loc) · 2.51 KB
/
mutate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package nds
import (
"context"
"cloud.google.com/go/datastore"
"github.com/pkg/errors"
"go.opencensus.io/trace"
)
var (
// This exists purely for testing
mutateHook func() error
)
type mutationType byte
const (
insertMutation mutationType = iota
upsertMutation
updateMutation
deleteMutation
)
type Mutation struct {
typ mutationType
k *datastore.Key
mut *datastore.Mutation
}
func NewDelete(k *datastore.Key) *Mutation {
return &Mutation{
typ: deleteMutation,
k: k,
mut: datastore.NewDelete(k),
}
}
func NewInsert(k *datastore.Key, src interface{}) *Mutation {
return &Mutation{
typ: insertMutation,
k: k,
mut: datastore.NewInsert(k, src),
}
}
func NewUpdate(k *datastore.Key, src interface{}) *Mutation {
return &Mutation{
typ: updateMutation,
k: k,
mut: datastore.NewUpdate(k, src),
}
}
func NewUpsert(k *datastore.Key, src interface{}) *Mutation {
return &Mutation{
typ: upsertMutation,
k: k,
mut: datastore.NewUpsert(k, src),
}
}
func (c *Client) Mutate(ctx context.Context, muts ...*Mutation) ([]*datastore.Key, error) {
var span *trace.Span
ctx, span = trace.StartSpan(ctx, "github.com/qedus/nds.Mutate")
defer span.End()
toLock := make([]*datastore.Key, 0, len(muts))
toLockRelease := make([]*datastore.Key, 0, len(muts))
mutations := make([]*datastore.Mutation, len(muts))
for i, mutation := range muts {
mutations[i] = mutation.mut
switch mutation.typ {
case insertMutation, upsertMutation, updateMutation:
// All of these are like Puts, so lock and release as you would a Put
// Insert: Will only succeed if new, may evict valid cached items erroneously
// Upsert: Same as a Put
// Update: Will only succeed if pre-existing, may evict cached misses erroneously
toLockRelease = append(toLockRelease, mutation.k)
case deleteMutation:
// Like a delete, lock and let alone
toLock = append(toLock, mutation.k)
}
}
if c.cacher != nil {
releaseCacheKeys, lockCacheItems := getCacheLocks(toLockRelease)
_, moreLockCacheItems := getCacheLocks(toLock)
lockCacheItems = append(lockCacheItems, moreLockCacheItems...)
defer func() {
// Optimistcally remove the locks.
if err := c.cacher.DeleteMulti(ctx,
releaseCacheKeys); err != nil {
c.onError(ctx, errors.Wrap(err, "Mutate cache.DeleteMulti"))
}
}()
if err := c.cacher.SetMulti(ctx,
lockCacheItems); err != nil {
return nil, err
}
if mutateHook != nil {
if err := mutateHook(); err != nil {
return nil, err
}
}
}
return c.Client.Mutate(ctx, mutations...)
}