/
TTable.java
562 lines (517 loc) · 21.7 KB
/
TTable.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
/**
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package com.yahoo.omid.transaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.ClientScanner;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import com.yahoo.omid.client.ColumnWrapper;
import com.yahoo.omid.client.RowKeyFamily;
/**
* Provides transactional methods for accessing and modifying a given snapshot
* of data identified by an opaque {@link Transaction} object.
*
*/
public class TTable {
public static long getsPerformed = 0;
public static long elementsGotten = 0;
public static long elementsRead = 0;
public static long extraGetsPerformed = 0;
public static double extraVersionsAvg = 3;
/** We always ask for CACHE_VERSIONS_OVERHEAD extra versions */
private static int CACHE_VERSIONS_OVERHEAD = 3;
/** Average number of versions needed to reach the right snapshot */
public double versionsAvg = 3;
/** How fast do we adapt the average */
private static final double alpha = 0.975;
private HTable table;
public TTable(Configuration conf, byte[] tableName) throws IOException {
table = new HTable(conf, tableName);
}
public TTable(Configuration conf, String tableName) throws IOException {
this(conf, Bytes.toBytes(tableName));
}
/**
* Extracts certain cells from a given row.
*
* @param get
* The object that specifies what data to fetch and from which
* row.
* @return The data coming from the specified row, if it exists. If the row
* specified doesn't exist, the {@link Result} instance returned
* won't contain any {@link KeyValue}, as indicated by
* {@link Result#isEmpty()}.
* @throws IOException
* if a remote or network exception occurs.
*/
public Result get(Transaction transaction, final Get get) throws IOException {
if (!(transaction instanceof TransactionState)) {
throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
}
TransactionState transactionState = (TransactionState) transaction;
final int requestedVersions = (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD);
final long readTimestamp = transactionState.getStartTimestamp();
final Get tsget = new Get(get.getRow());
TimeRange timeRange = get.getTimeRange();
long startTime = timeRange.getMin();
long endTime = Math.min(timeRange.getMax(), readTimestamp + 1);
tsget.setTimeRange(startTime, endTime).setMaxVersions(requestedVersions);
Map<byte[], NavigableSet<byte[]>> kvs = get.getFamilyMap();
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : kvs.entrySet()) {
byte[] family = entry.getKey();
NavigableSet<byte[]> qualifiers = entry.getValue();
if (qualifiers == null || qualifiers.isEmpty()) {
tsget.addFamily(family);
} else {
for (byte[] qualifier : qualifiers) {
tsget.addColumn(family, qualifier);
}
}
}
getsPerformed++;
// Return the KVs that belong to the transaction snapshot, ask for more
// versions if needed
return new Result(filter(transactionState, table.get(tsget).list(), requestedVersions));
}
/**
* Deletes the specified cells/row.
*
* @param delete
* The object that specifies what to delete.
* @throws IOException
* if a remote or network exception occurs.
*/
public void delete(Transaction transaction, Delete delete) throws IOException {
if (!(transaction instanceof TransactionState)) {
throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
}
TransactionState transactionState = (TransactionState) transaction;
final long startTimestamp = transactionState.getStartTimestamp();
boolean issueGet = false;
final Put deleteP = new Put(delete.getRow(), startTimestamp);
final Get deleteG = new Get(delete.getRow());
Map<byte[], List<KeyValue>> fmap = delete.getFamilyMap();
if (fmap.isEmpty()) {
issueGet = true;
}
for (List<KeyValue> kvl : fmap.values()) {
for (KeyValue kv : kvl) {
switch (KeyValue.Type.codeToType(kv.getType())) {
case DeleteColumn:
deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
break;
case DeleteFamily:
deleteG.addFamily(kv.getFamily());
issueGet = true;
break;
case Delete:
if (kv.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
deleteP.add(kv.getFamily(), kv.getQualifier(), startTimestamp, null);
break;
} else {
throw new UnsupportedOperationException(
"Cannot delete specific versions on Snapshot Isolation.");
}
}
}
}
if (issueGet) {
// It's better to perform a transactional get to avoid deleting more
// than necessary
Result result = this.get(transactionState, deleteG);
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entryF : result.getMap().entrySet()) {
byte[] family = entryF.getKey();
for (Entry<byte[], NavigableMap<Long, byte[]>> entryQ : entryF.getValue().entrySet()) {
byte[] qualifier = entryQ.getKey();
deleteP.add(family, qualifier, null);
}
}
}
transactionState.addRow(new RowKeyFamily(delete.getRow(), getTableName(), deleteP.getFamilyMap()));
table.put(deleteP);
}
/**
* Puts some data in the table.
* <p>
* If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
* until the internal buffer is full.
*
* @param put
* The data to put.
* @throws IOException
* if a remote or network exception occurs.
* @since 0.20.0
*/
public void put(Transaction transaction, Put put) throws IOException, IllegalArgumentException {
if (!(transaction instanceof TransactionState)) {
throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
}
TransactionState transactionState = (TransactionState) transaction;
final long startTimestamp = transactionState.getStartTimestamp();
// create put with correct ts
final Put tsput = new Put(put.getRow(), startTimestamp);
Map<byte[], List<KeyValue>> kvs = put.getFamilyMap();
for (List<KeyValue> kvl : kvs.values()) {
for (KeyValue kv : kvl) {
tsput.add(new KeyValue(kv.getRow(), kv.getFamily(), kv.getQualifier(), startTimestamp, kv.getValue()));
}
}
// should add the table as well
transactionState.addRow(new RowKeyFamily(tsput.getRow(), getTableName(), tsput.getFamilyMap()));
table.put(tsput);
}
/**
* Returns a scanner on the current table as specified by the {@link Scan}
* object. Note that the passed {@link Scan}'s start row and caching
* properties maybe changed.
*
* @param scan
* A configured {@link Scan} object.
* @return A scanner.
* @throws IOException
* if a remote or network exception occurs.
*/
public ResultScanner getScanner(Transaction transaction, Scan scan) throws IOException {
if (!(transaction instanceof TransactionState)) {
throw new IllegalArgumentException("transaction should be an instance of " + TransactionState.class);
}
TransactionState transactionState = (TransactionState) transaction;
Scan tsscan = new Scan(scan);
tsscan.setMaxVersions((int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
tsscan.setTimeRange(0, transactionState.getStartTimestamp() + 1);
TransactionalClientScanner scanner = new TransactionalClientScanner(transactionState, getConfiguration(),
tsscan, getTableName(), (int) (versionsAvg + CACHE_VERSIONS_OVERHEAD));
return scanner;
}
/**
* Filters the raw results returned from HBase and returns only those
* belonging to the current snapshot, as defined by the transactionState
* object. If the raw results don't contain enough information for a
* particular qualifier, it will request more versions from HBase.
*
* @param transactionState
* Defines the current snapshot
* @param kvs
* Raw KVs that we are going to filter
* @param localVersions
* Number of versions requested from hbase
* @return Filtered KVs belonging to the transaction snapshot
* @throws IOException
*/
private List<KeyValue> filter(TransactionState transactionState, List<KeyValue> kvs, int localVersions)
throws IOException {
final int requestVersions = localVersions * 2 + CACHE_VERSIONS_OVERHEAD;
if (kvs == null) {
return Collections.emptyList();
}
long startTimestamp = transactionState.getStartTimestamp();
// Filtered kvs
List<KeyValue> filtered = new ArrayList<KeyValue>();
// Map from column to older uncommitted timestamp
List<Get> pendingGets = new ArrayList<Get>();
ColumnWrapper lastColumn = new ColumnWrapper(null, null);
long oldestUncommittedTS = Long.MAX_VALUE;
boolean validRead = true;
// Number of versions needed to reach a committed value
int versionsProcessed = 0;
for (KeyValue kv : kvs) {
ColumnWrapper currentColumn = new ColumnWrapper(kv.getFamily(), kv.getQualifier());
if (!currentColumn.equals(lastColumn)) {
// New column, if we didn't read a committed value for last one,
// add it to pending
if (!validRead && versionsProcessed == localVersions) {
Get get = new Get(kv.getRow());
get.addColumn(kv.getFamily(), kv.getQualifier());
get.setMaxVersions(requestVersions); // TODO set maxVersions
// wisely
get.setTimeRange(0, oldestUncommittedTS - 1);
pendingGets.add(get);
}
validRead = false;
versionsProcessed = 0;
oldestUncommittedTS = Long.MAX_VALUE;
lastColumn = currentColumn;
}
if (validRead) {
// If we already have a committed value for this column, skip kv
continue;
}
versionsProcessed++;
if (transactionState.tsoclient.validRead(kv.getTimestamp(), startTimestamp)) {
// Valid read, add it to result unless it's a delete
if (kv.getValueLength() > 0) {
filtered.add(kv);
}
validRead = true;
// Update versionsAvg: increase it quickly, decrease it slowly
versionsAvg = versionsProcessed > versionsAvg ? versionsProcessed : alpha * versionsAvg + (1 - alpha)
* versionsProcessed;
} else {
// Uncomitted, keep track of oldest uncommitted timestamp
oldestUncommittedTS = Math.min(oldestUncommittedTS, kv.getTimestamp());
}
}
// If we have pending columns, request (and filter recursively) them
if (!pendingGets.isEmpty()) {
Result[] results = table.get(pendingGets);
for (Result r : results) {
filtered.addAll(filter(transactionState, r.list(), requestVersions));
}
}
Collections.sort(filtered, KeyValue.COMPARATOR);
return filtered;
}
protected class TransactionalClientScanner extends ClientScanner {
private TransactionState state;
private int maxVersions;
TransactionalClientScanner(TransactionState state, Configuration conf, Scan scan, byte[] table, int maxVersions)
throws IOException {
super(conf, scan, table);
this.state = state;
this.maxVersions = maxVersions;
}
@Override
public Result next() throws IOException {
List<KeyValue> filteredResult = Collections.emptyList();
while (filteredResult.isEmpty()) {
Result result = super.next();
if (result == null) {
return null;
}
filteredResult = filter(state, result.list(), maxVersions);
}
return new Result(filteredResult);
}
// In principle no need to override, copied from super.next(int) to make
// sure it works even if super.next(int)
// changes its implementation
@Override
public Result[] next(int nbRows) throws IOException {
// Collect values to be returned here
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
for (int i = 0; i < nbRows; i++) {
Result next = next();
if (next != null) {
resultSets.add(next);
} else {
break;
}
}
return resultSets.toArray(new Result[resultSets.size()]);
}
}
/**
* Gets the name of this table.
*
* @return the table name.
*/
public byte[] getTableName() {
return table.getTableName();
}
/**
* Returns the {@link Configuration} object used by this instance.
* <p>
* The reference returned is not a copy, so any change made to it will
* affect this instance.
*/
public Configuration getConfiguration() {
return table.getConfiguration();
}
/**
* Gets the {@link HTableDescriptor table descriptor} for this table.
*
* @throws IOException
* if a remote or network exception occurs.
*/
public HTableDescriptor getTableDescriptor() throws IOException {
return table.getTableDescriptor();
}
/**
* Test for the existence of columns in the table, as specified in the Get.
* <p>
*
* This will return true if the Get matches one or more keys, false if not.
* <p>
*
* This is a server-side call so it prevents any data from being transfered
* to the client.
*
* @param get
* the Get
* @return true if the specified Get matches one or more keys, false if not
* @throws IOException
* e
*/
public boolean exists(Transaction transaction, Get get) throws IOException {
Result result = get(transaction, get);
return !result.isEmpty();
}
/*
* @Override public void batch(Transaction transaction, List<? extends Row>
* actions, Object[] results) throws IOException, InterruptedException { //
* TODO Auto-generated method stub
*
* }
*
* @Override public Object[] batch(Transaction transaction, List<? extends
* Row> actions) throws IOException, InterruptedException { // TODO
* Auto-generated method stub return null; }
*
* @Override public <R> void batchCallback(Transaction transaction, List<?
* extends Row> actions, Object[] results, Callback<R> callback) throws
* IOException, InterruptedException { // TODO Auto-generated method stub
*
* }
*
* @Override public <R> Object[] batchCallback(List<? extends Row> actions,
* Callback<R> callback) throws IOException, InterruptedException { // TODO
* Auto-generated method stub return null; }
*/
/**
* Extracts certain cells from the given rows, in batch.
*
* @param gets
* The objects that specify what data to fetch and from which
* rows.
*
* @return The data coming from the specified rows, if it exists. If the row
* specified doesn't exist, the {@link Result} instance returned
* won't contain any {@link KeyValue}, as indicated by
* {@link Result#isEmpty()}. If there are any failures even after
* retries, there will be a null in the results array for those
* Gets, AND an exception will be thrown.
* @throws IOException
* if a remote or network exception occurs.
*
*/
public Result[] get(Transaction transaction, List<Get> gets) throws IOException {
Result[] results = new Result[gets.size()];
int i = 0;
for (Get get : gets) {
results[i++] = get(transaction, get);
}
return results;
}
/**
* Gets a scanner on the current table for the given family.
*
* @param family
* The column family to scan.
* @return A scanner.
* @throws IOException
* if a remote or network exception occurs.
*/
public ResultScanner getScanner(Transaction transaction, byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(transaction, scan);
}
/**
* Gets a scanner on the current table for the given family and qualifier.
*
* @param family
* The column family to scan.
* @param qualifier
* The column qualifier to scan.
* @return A scanner.
* @throws IOException
* if a remote or network exception occurs.
*/
public ResultScanner getScanner(Transaction transaction, byte[] family, byte[] qualifier) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(transaction, scan);
}
/**
* Puts some data in the table, in batch.
* <p>
* If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
* until the internal buffer is full.
* <p>
* This can be used for group commit, or for submitting user defined
* batches. The writeBuffer will be periodically inspected while the List is
* processed, so depending on the List size the writeBuffer may flush not at
* all, or more than once.
*
* @param puts
* The list of mutations to apply. The batch put is done by
* aggregating the iteration of the Puts over the write buffer at
* the client-side for a single RPC call.
* @throws IOException
* if a remote or network exception occurs.
*/
public void put(Transaction transaction, List<Put> puts) throws IOException {
for (Put put : puts) {
put(transaction, put);
}
}
/**
* Deletes the specified cells/rows in bulk.
*
* @param deletes
* List of things to delete. List gets modified by this method
* (in particular it gets re-ordered, so the order in which the
* elements are inserted in the list gives no guarantee as to the
* order in which the {@link Delete}s are executed).
* @throws IOException
* if a remote or network exception occurs. In that case the
* {@code deletes} argument will contain the {@link Delete}
* instances that have not be successfully applied.
*/
public void delete(Transaction transaction, List<Delete> deletes) throws IOException {
for (Delete delete : deletes) {
delete(transaction, delete);
}
}
/**
* Provides access to the underliying HTable in order to configure it or to
* perform unsafe (non-transactional) operations. The latter would break the
* transactional guarantees of the whole system.
*
* @return The underlying HTable object
*/
public HTableInterface getHTable() {
return table;
}
/**
* Releases any resources held or pending changes in internal buffers.
*
* @throws IOException
* if a remote or network exception occurs.
*/
public void close() throws IOException {
table.close();
}
}