forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
id_alloc.go
146 lines (130 loc) · 3.98 KB
/
id_alloc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"fmt"
"sync"
"sync/atomic"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/pkg/errors"
)
// An idAllocator is used to increment a key in allocation blocks
// of arbitrary size starting at a minimum ID.
//
// Note: if all you want is to increment a key and retry on retryable errors,
// see client.IncrementValRetryable().
type idAllocator struct {
log.AmbientContext
idKey atomic.Value
db *client.DB
minID uint32 // Minimum ID to return
blockSize uint32 // Block allocation size
ids chan uint32 // Channel of available IDs
stopper *stop.Stopper
once sync.Once
}
// newIDAllocator creates a new ID allocator which increments the
// specified key in allocation blocks of size blockSize, with
// allocated IDs starting at minID. Allocated IDs are positive
// integers.
func newIDAllocator(
ambient log.AmbientContext,
idKey roachpb.Key,
db *client.DB,
minID uint32,
blockSize uint32,
stopper *stop.Stopper,
) (*idAllocator, error) {
// minID can't be the zero value because reads from closed channels return
// the zero value.
if minID == 0 {
return nil, errors.Errorf("minID must be a positive integer: %d", minID)
}
if blockSize == 0 {
return nil, errors.Errorf("blockSize must be a positive integer: %d", blockSize)
}
ia := &idAllocator{
AmbientContext: ambient,
db: db,
minID: minID,
blockSize: blockSize,
ids: make(chan uint32, blockSize/2+1),
stopper: stopper,
}
ia.idKey.Store(idKey)
return ia, nil
}
// Allocate allocates a new ID from the global KV DB.
func (ia *idAllocator) Allocate(ctx context.Context) (uint32, error) {
ia.once.Do(ia.start)
select {
case id := <-ia.ids:
// when the channel is closed, the zero value is returned.
if id == 0 {
return id, errors.Errorf("could not allocate ID; system is draining")
}
return id, nil
case <-ctx.Done():
return 0, ctx.Err()
}
}
func (ia *idAllocator) start() {
ctx := ia.AnnotateCtx(context.Background())
ia.stopper.RunWorker(ctx, func(ctx context.Context) {
defer close(ia.ids)
for {
var newValue int64
for newValue <= int64(ia.minID) {
var err error
var res client.KeyValue
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
idKey := ia.idKey.Load().(roachpb.Key)
if err := ia.stopper.RunTask(ctx, "storage.idAllocator: allocating block", func(ctx context.Context) {
res, err = ia.db.Inc(ctx, idKey, int64(ia.blockSize))
}); err != nil {
log.Warning(ctx, err)
return
}
if err == nil {
newValue = res.ValueInt()
break
}
log.Warningf(ctx, "unable to allocate %d ids from %s: %s", ia.blockSize, idKey, err)
}
if err != nil {
panic(fmt.Sprintf("unexpectedly exited id allocation retry loop: %s", err))
}
}
end := newValue + 1
start := end - int64(ia.blockSize)
if start < int64(ia.minID) {
start = int64(ia.minID)
}
// Add all new ids to the channel for consumption.
for i := start; i < end; i++ {
select {
case ia.ids <- uint32(i):
case <-ia.stopper.ShouldStop():
return
}
}
}
})
}