-
Notifications
You must be signed in to change notification settings - Fork 13.6k
/
SnapshottableHashTable.java
474 lines (442 loc) · 18.6 KB
/
SnapshottableHashTable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.timeline;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* SnapshottableHashTable implements a hash table that supports creating point-in-time
* snapshots. Each snapshot is immutable once it is created; the past cannot be changed.
* We handle divergences between the current state and historical state by copying a
* reference to elements that have been deleted or overwritten into the most recent
* snapshot tier.
*
* Note that there are no keys in SnapshottableHashTable, only values. So it more similar
* to a hash set than a hash map. The subclasses implement full-featured maps and sets
* using this class as a building block.
*
* Each snapshot tier contains a size and a hash table. The size reflects the size at
* the time the snapshot was taken. Note that, as an optimization, snapshot tiers will
* be null if they don't contain anything. So for example, if snapshot 20 of Object O
* contains the same entries as snapshot 10 of that object, the snapshot 20 tier for
* object O will be null.
*
* The current tier's data is stored in the fields inherited from BaseHashTable. It
* would be conceptually simpler to have a separate BaseHashTable object, but since Java
* doesn't have value types, subclassing is the only way to avoid another pointer
* indirection and the associated extra memory cost.
*
* Note that each element in the hash table contains a start epoch, and a value. The
* start epoch is there to identify when the object was first inserted. This in turn
* determines which snapshots it is a member of.
*
* In order to retrieve an object from snapshot E, we start by checking to see if the
* object exists in the "current" hash tier. If it does, and its startEpoch extends back
* to E, we return that object. Otherwise, we check all the snapshot tiers, starting
* with E, and ending with the most recent snapshot, to see if the object is there.
* As an optimization, if we encounter the object in a snapshot tier but its epoch is too
* new, we know that its value at epoch E must be null, so we can return that immediately.
*
* The class hierarchy looks like this:
*
* Revertable BaseHashTable
* ↑ ↑
* SnapshottableHashTable → SnapshotRegistry → Snapshot
* ↑ ↑
* TimelineHashSet TimelineHashMap
*
* BaseHashTable is a simple hash table that uses separate chaining. The interface is
* pretty bare-bones since this class is not intended to be used directly by end-users.
*
* This class, SnapshottableHashTable, has the logic for snapshotting and iterating over
* snapshots. This is the core of the snapshotted hash table code and handles the
* tiering.
*
* TimelineHashSet and TimelineHashMap are mostly wrappers around this
* SnapshottableHashTable class. They implement standard Java APIs for Set and Map,
* respectively. There's a fair amount of boilerplate for this, but it's necessary so
* that timeline data structures can be used while writing idiomatic Java code.
* The accessor APIs have two versions -- one that looks at the current state, and one
* that looks at a historical snapshotted state. Mutation APIs only ever mutate the
* current state.
*
* One very important feature of SnapshottableHashTable is that we support iterating
* over a snapshot even while changes are being made to the current state. See the
* Javadoc for the iterator for more information about how this is accomplished.
*
* All of these classes require external synchronization, and don't support null keys or
* values.
*/
class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEpoch>
extends BaseHashTable<T> implements Revertable {
/**
* A special epoch value that represents the latest data.
*/
final static long LATEST_EPOCH = Long.MAX_VALUE;
interface ElementWithStartEpoch {
void setStartEpoch(long startEpoch);
long startEpoch();
}
static class HashTier<T extends SnapshottableHashTable.ElementWithStartEpoch> implements Delta {
private final int size;
private BaseHashTable<T> deltaTable;
HashTier(int size) {
this.size = size;
}
@SuppressWarnings("unchecked")
@Override
public void mergeFrom(long epoch, Delta source) {
HashTier<T> other = (HashTier<T>) source;
// As an optimization, the deltaTable might not exist for a new key
// as there is no previous value
if (other.deltaTable != null) {
List<T> list = new ArrayList<>();
Object[] otherElements = other.deltaTable.baseElements();
for (int slot = 0; slot < otherElements.length; slot++) {
BaseHashTable.unpackSlot(list, otherElements, slot);
for (T element : list) {
// When merging in a later hash tier, we want to keep only the elements
// that were present at our epoch.
if (element.startEpoch() <= epoch) {
if (deltaTable == null) {
deltaTable = new BaseHashTable<>(1);
}
deltaTable.baseAddOrReplace(element);
}
}
}
}
}
}
/**
* Iterate over the values that currently exist in the hash table.
*
* You can use this iterator even if you are making changes to the map.
* The changes may or may not be visible while you are iterating.
*/
class CurrentIterator implements Iterator<T> {
private final Object[] topTier;
private final List<T> ready;
private int slot;
private T lastReturned;
CurrentIterator(Object[] topTier) {
this.topTier = topTier;
this.ready = new ArrayList<>();
this.slot = 0;
this.lastReturned = null;
}
@Override
public boolean hasNext() {
while (ready.isEmpty()) {
if (slot == topTier.length) {
return false;
}
BaseHashTable.unpackSlot(ready, topTier, slot);
slot++;
}
return true;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
lastReturned = ready.remove(ready.size() - 1);
return lastReturned;
}
@Override
public void remove() {
if (lastReturned == null) {
throw new UnsupportedOperationException("remove");
}
snapshottableRemove(lastReturned);
lastReturned = null;
}
}
/**
* Iterate over the values that existed in the hash table during a specific snapshot.
*
* You can use this iterator even if you are making changes to the map.
* The snapshot is immutable and will always show up the same.
*/
class HistoricalIterator implements Iterator<T> {
private final Object[] topTier;
private final Snapshot snapshot;
private final List<T> temp;
private final List<T> ready;
private int slot;
HistoricalIterator(Object[] topTier, Snapshot snapshot) {
this.topTier = topTier;
this.snapshot = snapshot;
this.temp = new ArrayList<>();
this.ready = new ArrayList<>();
this.slot = 0;
}
@Override
public boolean hasNext() {
while (ready.isEmpty()) {
if (slot == topTier.length) {
return false;
}
BaseHashTable.unpackSlot(temp, topTier, slot);
for (T object : temp) {
if (object.startEpoch() <= snapshot.epoch()) {
ready.add(object);
}
}
temp.clear();
/*
* As we iterate over the SnapshottableHashTable, elements may move from
* the top tier into the snapshot tiers. This would happen if something
* were deleted in the top tier, for example, but still retained in the
* snapshot.
*
* We don't want to return any elements twice, though. Therefore, we
* iterate over the top tier and the snapshot tier at the
* same time. The key to understanding how this works is realizing that
* both hash tables use the same hash function, but simply choose a
* different number of significant bits based on their size.
* So if the top tier has size 4 and the snapshot tier has size 2, we have
* the following mapping:
*
* Elements that would be in slot 0 or 1 in the top tier can only be in
* slot 0 in the snapshot tier.
* Elements that would be in slot 2 or 3 in the top tier can only be in
* slot 1 in the snapshot tier.
*
* Therefore, we can do something like this:
* 1. check slot 0 in the top tier and slot 0 in the snapshot tier.
* 2. check slot 1 in the top tier and slot 0 in the snapshot tier.
* 3. check slot 2 in the top tier and slot 1 in the snapshot tier.
* 4. check slot 3 in the top tier and slot 1 in the snapshot tier.
*
* If elements move from the top tier to the snapshot tier, then
* we'll still find them and report them exactly once.
*
* Note that while I used 4 and 2 as example sizes here, the same pattern
* holds for different powers of two. The "snapshot slot" of an element
* will be the top few bits of the top tier slot of that element.
*/
Iterator<Snapshot> iterator = snapshotRegistry.iterator(snapshot);
while (iterator.hasNext()) {
Snapshot curSnapshot = iterator.next();
HashTier<T> tier = curSnapshot.getDelta(SnapshottableHashTable.this);
if (tier != null && tier.deltaTable != null) {
BaseHashTable<T> deltaTable = tier.deltaTable;
int shift = Integer.numberOfLeadingZeros(deltaTable.baseElements().length) -
Integer.numberOfLeadingZeros(topTier.length);
int tierSlot = slot >>> shift;
BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot);
for (T object : temp) {
if (object.startEpoch() <= snapshot.epoch()) {
if (BaseHashTable.findSlot(object, topTier.length) == slot) {
ready.add(object);
}
}
}
temp.clear();
}
}
slot++;
}
return true;
}
@Override
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return ready.remove(ready.size() - 1);
}
}
private final SnapshotRegistry snapshotRegistry;
SnapshottableHashTable(SnapshotRegistry snapshotRegistry, int expectedSize) {
super(expectedSize);
this.snapshotRegistry = snapshotRegistry;
snapshotRegistry.register(this);
}
int snapshottableSize(long epoch) {
if (epoch == LATEST_EPOCH) {
return baseSize();
} else {
Iterator<Snapshot> iterator = snapshotRegistry.iterator(epoch);
while (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
HashTier<T> tier = snapshot.getDelta(SnapshottableHashTable.this);
if (tier != null) {
return tier.size;
}
}
return baseSize();
}
}
T snapshottableGet(Object key, long epoch) {
T result = baseGet(key);
if (result != null && result.startEpoch() <= epoch) {
return result;
}
if (epoch == LATEST_EPOCH) {
return null;
}
Iterator<Snapshot> iterator = snapshotRegistry.iterator(epoch);
while (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
HashTier<T> tier = snapshot.getDelta(SnapshottableHashTable.this);
if (tier != null && tier.deltaTable != null) {
result = tier.deltaTable.baseGet(key);
if (result != null) {
if (result.startEpoch() <= epoch) {
return result;
} else {
return null;
}
}
}
}
return null;
}
boolean snapshottableAddUnlessPresent(T object) {
T prev = baseGet(object);
if (prev != null) {
return false;
}
object.setStartEpoch(snapshotRegistry.latestEpoch() + 1);
int prevSize = baseSize();
baseAddOrReplace(object);
updateTierData(prevSize);
return true;
}
T snapshottableAddOrReplace(T object) {
object.setStartEpoch(snapshotRegistry.latestEpoch() + 1);
int prevSize = baseSize();
T prev = baseAddOrReplace(object);
if (prev == null) {
updateTierData(prevSize);
} else {
updateTierData(prev, prevSize);
}
return prev;
}
T snapshottableRemove(Object object) {
T prev = baseRemove(object);
if (prev == null) {
return null;
} else {
updateTierData(prev, baseSize() + 1);
return prev;
}
}
private void updateTierData(int prevSize) {
Iterator<Snapshot> iterator = snapshotRegistry.reverseIterator();
if (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
HashTier<T> tier = snapshot.getDelta(SnapshottableHashTable.this);
if (tier == null) {
tier = new HashTier<>(prevSize);
snapshot.setDelta(SnapshottableHashTable.this, tier);
}
}
}
private void updateTierData(T prev, int prevSize) {
Iterator<Snapshot> iterator = snapshotRegistry.reverseIterator();
if (iterator.hasNext()) {
Snapshot snapshot = iterator.next();
// If the previous element was present in the most recent snapshot, add it to
// that snapshot's hash tier.
if (prev.startEpoch() <= snapshot.epoch()) {
HashTier<T> tier = snapshot.getDelta(SnapshottableHashTable.this);
if (tier == null) {
tier = new HashTier<>(prevSize);
snapshot.setDelta(SnapshottableHashTable.this, tier);
}
if (tier.deltaTable == null) {
tier.deltaTable = new BaseHashTable<>(1);
}
tier.deltaTable.baseAddOrReplace(prev);
}
}
}
Iterator<T> snapshottableIterator(long epoch) {
if (epoch == LATEST_EPOCH) {
return new CurrentIterator(baseElements());
} else {
return new HistoricalIterator(baseElements(), snapshotRegistry.getSnapshot(epoch));
}
}
String snapshottableToDebugString() {
StringBuilder bld = new StringBuilder();
bld.append(String.format("SnapshottableHashTable{%n"));
bld.append("top tier: ");
bld.append(baseToDebugString());
bld.append(String.format(",%nsnapshot tiers: [%n"));
String prefix = "";
for (Iterator<Snapshot> iter = snapshotRegistry.iterator(); iter.hasNext(); ) {
Snapshot snapshot = iter.next();
bld.append(prefix);
bld.append("epoch ").append(snapshot.epoch()).append(": ");
HashTier<T> tier = snapshot.getDelta(this);
if (tier == null) {
bld.append("null");
} else {
bld.append("HashTier{");
bld.append("size=").append(tier.size);
bld.append(", deltaTable=");
if (tier.deltaTable == null) {
bld.append("null");
} else {
bld.append(tier.deltaTable.baseToDebugString());
}
bld.append("}");
}
bld.append(String.format("%n"));
}
bld.append(String.format("]}%n"));
return bld.toString();
}
@SuppressWarnings("unchecked")
@Override
public void executeRevert(long targetEpoch, Delta delta) {
HashTier<T> tier = (HashTier<T>) delta;
Iterator<T> iter = snapshottableIterator(LATEST_EPOCH);
while (iter.hasNext()) {
T element = iter.next();
if (element.startEpoch() > targetEpoch) {
iter.remove();
}
}
BaseHashTable<T> deltaTable = tier.deltaTable;
if (deltaTable != null) {
List<T> out = new ArrayList<>();
for (int i = 0; i < deltaTable.baseElements().length; i++) {
BaseHashTable.unpackSlot(out, deltaTable.baseElements(), i);
for (T value : out) {
baseAddOrReplace(value);
}
out.clear();
}
}
}
@Override
public void reset() {
Iterator<T> iter = snapshottableIterator(SnapshottableHashTable.LATEST_EPOCH);
while (iter.hasNext()) {
iter.next();
iter.remove();
}
}
}