-
Notifications
You must be signed in to change notification settings - Fork 0
/
BaseMergeSource.java
355 lines (316 loc) · 11.3 KB
/
BaseMergeSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
/*
* Copyright 2013 Babak Farhang
*/
package io.crums.io.store.table.merge;
import java.io.IOException;
import java.nio.ByteBuffer;
import io.crums.io.store.table.SortedTable;
import io.crums.io.store.table.SortedTable.Searcher;
/**
* <p>
* We maintain a stack of these in a multiway merge. A merge source encapsulates progress
* from a single table. These are mutually comparable in a way that allows
* us to easily track rows from which source table must be copied to the
* target of a merge.
* </p><p>
* Here's the basic idea how works in ASCII art..
* </p><p>
* Let's say we have 3 sorted tables we want to merge. In order to keep the sketch simple,
* let's say no 2 rows across any of the tables compare equal:
* </p>
* <pre>
*
* E B I
* F C J
* L G M
* H P
* K Q
* Z U
* X
* Y
* </pre>
* <p>
* Each column above stands for a sorted table, and each letter in a column stands for the value
* of a row in that table. A merge source instance is just a wrapper around a table along with a
* cursor indicating the current (unmerged) row. Here are the same tables, wrapped as merge sources,
* and sorted in their natural order:
* </p>
* <pre>
* {@code
* I< E< B<
* J F C
* M L G
* P H
* Q K
* U Z
* X
* Y
* }
* </pre>
* <p>
* Each merge source's cursor is initialized to its table's first row (row number zero).
* Merge sources are ordered based on the comparing the the row each points to with that
* of another. The ordering is actually in reverse, in order to more efficiently
* support tail end removal of merge sources from lists (an optimization, likely not worth the
* extra cognitive load it introduces). The diagram above exactly depicts this ordering. The
* tail end of the list of merge sources points to the next row to be written out to the
* output table. So you can view this ordering of merge sources as a kind of stack with the
* tail end of the list representing what must be worked on next.
* </p><p>
* Let's call the tail merge source (far right) in the list the <em>top</em> merge source,
* and the one immediately to the left of it, the <em>next</em> merge source.
* </p><p>
* The first step in the merge is to compute the index of the <em>next</em>'s row in <em>top</em>.
* This is the row number in <em>top</em> where where <em>next</em>'s current row would have
* found itself if it had been solely merged into <em>top</em>'s table. In our example,
* the index evaluates to 2. Note this calculation is <strong>O</strong>(log n). Next, all the rows between the <em>top</em>s current row number
* and the index are block-transfered to the output file.
* </p>
* <pre>
* {@code
*
* I< E< -- B< B
* J F | C C
* M L --> G (row # 2)
* P H
* Q K
* U Z
* X
* Y
* }
* </pre>
* <p>
* The situation is depicted above after the rows in <em>top</em> have been transfered to
* output (far right column).
* </p><p>
* Next the top merge source's cursor (row number) is set to the index noted in the previous
* step and the merge sources are resorted:
* </p>
* <pre>{@code
*
* I< B E< B
* J C F C
* M G< L
* P H
* Q K
* U Z
* X
* Y
* }
* </pre>
* <p>
* The above 2 steps are repeated until the <em>top</em>'s cursor points beyond its last
* row. In that event, the top merge source is removed from the list. Processing then resumes
* until there is but one merge source remaining in the list. The next few steps, are
* depicted below..
* </p>
* <pre>{@code
* I< B E< B
* J C F C
* M G< > L E
* P H F
* Q K
* U Z
* X
* Y
* }
* </pre>
* <p> </p>
* <pre>{@code
* E I< B B
* F J C C
* L< M G< E
* P H F
* Q > K G
* U Z H
* X
* Y
* }
* </pre>
* <p>
* Finally, when there's only one merge source remaining in the list of merge sources,
* the remaining rows in that merge source are appended to the output file.
* </p>
* <h2>Don't access concurrently</h2>
* <p>
* Designed for single threaded access (not even concurrent reads).
* </p>
* <h2>Note</h2>
* <p>
* Don't be daunted by the self-referential type parameter <em><code>S</code></em>: it just
* says that derived instances are mutually comparable.
* </p>
*/
public abstract class BaseMergeSource<S extends BaseMergeSource<?>> implements Comparable<S> {
private final Searcher searcher;
private final long rowCount;
private final ByteBuffer row;
private long rowCursor;
public BaseMergeSource(SortedTable.Searcher searcher) throws IOException {
this(searcher, 0);
}
public BaseMergeSource(SortedTable.Searcher searcher, long initRowNumber) throws IOException {
if (searcher == null)
throw new IllegalArgumentException("null searcher");
this.searcher = searcher;
this.rowCount = searcher.getTable().getRowCount();
if (initRowNumber > rowCount)
throw new IllegalArgumentException("initRowNumber > rowCount: " + initRowNumber + " > " + rowCount);
if (rowCount == 0)
throw new IllegalArgumentException("empty table: " + searcher.getTable());
this.row = ByteBuffer.allocate(searcher.getTable().getRowWidth());
setRow(initRowNumber);
}
public final Searcher searcher() {
return searcher;
}
public final SortedTable table() {
return searcher.getTable();
}
/**
* Indicates the current row number. If in the range [0, {@linkplain #rowCount() rowCount} - 1],
* then the contents of that row number is given by {@linkplain #row()}.
*
* @return a number between -1 and {@linkplain #rowCount()}
*/
public long rowNumber() {
return rowCursor;
}
/**
* Sets the {@linkplain #rowNumber()} and loads the corresponding {@linkplain #row()};
* unless the <code>rowNum</code> argument is equal to either {@linkplain #rowCount()} or
* <code>-1</code>, in which case the instance is considered finished.
*
* @throws IndexOutOfBoundsException
*/
public void setRow(long rowNum) throws IOException {
if (rowNum >= rowCount || rowNum < 0) {
if (rowNum == rowCount || rowNum == -1) {
// we're finished
rowCursor = rowNum;
return;
}
throw new IndexOutOfBoundsException("rowNum/rowCount: " + rowNum + "/" + rowCount);
}
row.clear();
// if the row is already loaded in the search buffer
if (searcher.isRowInBuffer(rowNum))
searcher.copyRowInto(rowNum, row);
// o.w. hit the file system
else
searcher.getTable().read(rowNum, row);
row.flip();
rowCursor = rowNum;
}
/**
* Advances to the next row, unless already at the end of this merge source.
*
* @return <code>true</code>, if advanced to the next existing row; <code>false</code>,
* if advanced, or already advanced, past the last row, i.e. if <code>finished()</code>.
*/
public boolean advanceRow() throws IOException {
if (finished())
return false;
setRow(rowNumber() + 1);
return !finished();
}
/**
* Rewinds to the previous row, unless already at the beginning of this merge source.
*
* @return <code>true</code>, if advanced to the previous existing row; <code>false</code>,
* if rewound, or already rewound, past the first row, i.e. if <code>finished()</code>.
*/
public boolean rewindRow() throws IOException {
if (finished())
return false;
setRow(rowNumber() - 1);
return !finished();
}
/**
* Returns the snapshot row count. (If there are concurrent additions to the
* underlying table, those will be ignored.)
* @return
*/
public final long rowCount() {
return rowCount;
}
/**
* Returns the contents of the current {@linkplain #rowNumber()}, if the
* instance is not {@linkplain #finished()}; otherwise, the return value is undefined.
* The returned instance is a new read-only view onto the current row. Its contents will
* be automatically modified every time the row number is changed.
*/
public final ByteBuffer row() {
return row.asReadOnlyBuffer();
}
/**
* Copies the contents of the current row into the given <code>buffer</code>. Slightly
* more efficient
*/
public void copyRowInto(ByteBuffer buffer) {
// sanity check
if (row.remaining() != row.capacity())
throw new IllegalStateException("Concurrent access? current row: " + row);
buffer.put(row);
row.rewind();
}
/**
* An instance is finished when all its rows have been copied to the target. I.e. when
* <code>{@linkplain #rowNumber()} == {@linkplain #rowCount()}</code>.
*/
public final boolean finished() {
return rowCursor == rowCount || rowCursor == -1;
}
@Override
public String toString() {
StringBuilder string = new StringBuilder(128);
string.append(getClass().getSimpleName()).append('[');
appendToStringFields(string).append(']');
return string.toString();
}
protected StringBuilder appendToStringFields(StringBuilder string) {
return string.append("table=").append(searcher.getTable()).append(", row=").append(row)
.append(", rowCount=").append(rowCount).append(", rowNumber=").append(rowNumber());
}
protected int compareToImpl(BaseMergeSource<?> other) {
// finished instances are ordered to the bottom of the stack
if (this.finished()) {
return other.finished() ? 0 : -1;
} else if (other.finished())
return this.finished() ? 0 : 1;
// the above is dead code
else
return -table().order().compare(this.row, other.row);
}
/**
* Compares this instance's {@linkplain #row() row} with the <code>otherRow</code>.
* This is functionally equivalent to
* <pre>
* <code>
* source = .. // a BaseMergeSource
* source.table().order().compare(source.row(), otherRow)
* </code>
* </pre>
* except that it's marginally more efficient.
*
*/
public int compareRowWithOther(ByteBuffer otherRow) {
return table().order().compare(this.row, otherRow);
}
/**
* Compares the <code>otherRow</code> to this instance's {@linkplain #row() row}.
* This is functionally equivalent to
* <pre>
* <code>
* source = .. // a BaseMergeSource
* source.table().order().compare(otherRow, source.row())
* </code>
* </pre>
* except that it's marginally more efficient.
*
* @return <code>-compareRowWithOther(otherRow)</code>
*/
public final int compareOtherWithRow(ByteBuffer otherRow) {
return -compareRowWithOther(otherRow);
}
}