/
int32_map.go
98 lines (79 loc) · 1.77 KB
/
int32_map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package store
import (
"sync"
)
type Int32Map struct {
mutexes []sync.Mutex
shards []map[int32]*BlockStatus
maxTracked int64
}
func NewInt32Map(maxTracked int64) *Int32Map {
result := new(Int32Map)
result.maxTracked = maxTracked
result.mutexes = make([]sync.Mutex, NumShards)
result.shards = make([]map[int32]*BlockStatus, NumShards)
for i := 0; i < NumShards; i++ {
result.shards[i] = make(map[int32]*BlockStatus)
}
return result
}
func (i *Int32Map) Get(key interface{}) (*BlockStatus, *sync.Mutex) {
// Type check
keyFloat64, ok := key.(float64)
if !ok {
return nil, nil
}
keyInt32 := int32(keyFloat64)
index := keyInt32 % NumShards
// Locking the mutex
mutex := &i.mutexes[index]
mutex.Lock()
shard := i.shards[index]
// Make sure we don't track too many values
if i.maxTracked != 0 {
totalApprox := NumShards * int64(len(shard))
if totalApprox > i.maxTracked {
mutex.Unlock()
return nil, nil
}
}
// Create the status if it doesn't exist
status, exists := shard[keyInt32]
if !exists {
status = new(BlockStatus)
shard[keyInt32] = status
}
return status, mutex
}
func (i *Int32Map) CleanUp(clock int32) {
fClock := float64(clock)
for j := range i.mutexes {
i.mutexes[j].Lock()
for k := range i.shards[j] {
if i.shards[j][k].FrontTile < fClock {
delete(i.shards[j], k)
}
}
i.mutexes[j].Unlock()
}
}
func (i *Int32Map) Type() string {
return "int32"
}
func (i *Int32Map) BlockedValues() []BlockedValue {
result := make([]BlockedValue, 0)
for j := range i.mutexes {
shard := &i.shards[j]
i.mutexes[j].Lock()
for key, status := range *shard {
if status.IsBlocked {
result = append(
result,
BlockedValue{Since: status.Since, Value: key},
)
}
}
i.mutexes[j].Unlock()
}
return result
}