/
assoc.go
126 lines (109 loc) · 3.39 KB
/
assoc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2017 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package testutil
import (
"context"
"sync"
"time"
"github.com/grailbio/base/digest"
"github.com/grailbio/reflow/assoc"
"github.com/grailbio/reflow/errors"
"github.com/grailbio/reflow/liveset"
)
type assocKey struct {
assoc.Kind
digest.Digest
}
type InmemoryAssoc struct {
mu sync.Mutex
assocs map[assocKey]digest.Digest
scanTimeGenerator func() time.Time
scanLabelsGenerator func() []string
}
// NewInmemoryAssoc returns a new assoc.Assoc
// that stores its mapping in memory.
func NewInmemoryAssoc() *InmemoryAssoc {
return &InmemoryAssoc{
assocs: make(map[assocKey]digest.Digest),
}
}
func (a *InmemoryAssoc) Store(ctx context.Context, kind assoc.Kind, k, v digest.Digest) error {
a.mu.Lock()
defer a.mu.Unlock()
key := assocKey{kind, k}
if v.IsZero() {
delete(a.assocs, key)
} else {
a.assocs[key] = v
}
return nil
}
func (a *InmemoryAssoc) Get(ctx context.Context, kind assoc.Kind, k digest.Digest) (digest.Digest, digest.Digest, error) {
a.mu.Lock()
defer a.mu.Unlock()
key := assocKey{kind, k}
v, ok := a.assocs[key]
if !ok {
return k, digest.Digest{}, errors.E(errors.NotExist, errors.New("key does not exist"))
}
return k, v, nil
}
func (a *InmemoryAssoc) BatchGet(ctx context.Context, batch assoc.Batch) error {
a.mu.Lock()
defer a.mu.Unlock()
for k := range batch {
v := a.assocs[assocKey{k.Kind, k.Digest}]
batch[k] = assoc.Result{Digest: v}
}
return nil
}
// CollectWithThreshold removes from this assoc any objects whose keys are not in the
// liveset and which have not been accessed more recently than the liveset's
// threshold time.
func (a *InmemoryAssoc) CollectWithThreshold(context.Context, liveset.Liveset, liveset.Liveset, time.Time, int64, bool) error {
return errors.E("collect", errors.NotSupported)
}
// Count returns an estimate of the number of associations in this mapping.
func (a *InmemoryAssoc) Count(ctx context.Context) (int64, error) {
a.mu.Lock()
defer a.mu.Unlock()
return int64(len(a.assocs)), nil
}
// Scan calls the handler function for every association in the mapping.
// Note that the handler function may be called asynchronously from multiple threads.
func (a *InmemoryAssoc) Scan(ctx context.Context, kinds []assoc.Kind, handler assoc.MappingHandler) error {
results := make(map[digest.Digest]map[assoc.Kind]digest.Digest)
a.mu.Lock()
for k, v := range a.assocs {
if _, ok := results[k.Digest]; !ok {
results[k.Digest] = make(map[assoc.Kind]digest.Digest)
}
results[k.Digest][k.Kind] = v
}
a.mu.Unlock()
for k, v := range results {
handler.HandleMapping(ctx, k, v, a.scanTimeGenerator(), a.scanLabelsGenerator())
}
return nil
}
// Delete deletes the key k unconditionally from the provided assoc.
func (a *InmemoryAssoc) Delete(ctx context.Context, k digest.Digest) error {
return errors.E("delete", errors.NotSupported)
}
func (a *InmemoryAssoc) RawAssocs() map[assocKey]digest.Digest {
return a.assocs
}
func (a *InmemoryAssoc) SetScanTimeGenerator(gen func() time.Time) {
a.scanTimeGenerator = gen
}
func (a *InmemoryAssoc) SetScanLabelsGenerator(gen func() []string) {
a.scanLabelsGenerator = gen
}
func (a *InmemoryAssoc) Copy() *InmemoryAssoc {
aNew := NewInmemoryAssoc()
for k, v := range a.assocs {
aNew.assocs[k] = v
}
return aNew
}