-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
StoreFlusher.java
155 lines (143 loc) · 5.95 KB
/
StoreFlusher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
* Custom implementation can be provided.
*/
@InterfaceAudience.Private
abstract class StoreFlusher {
protected Configuration conf;
protected Store store;
public StoreFlusher(Configuration conf, Store store) {
this.conf = conf;
this.store = store;
}
/**
* Turns a snapshot of memstore into a set of store files.
* @param snapshot Memstore snapshot.
* @param cacheFlushSeqNum Log cache flush sequence number.
* @param status Task that represents the flush operation and may be updated with status.
* @param throughputController A controller to avoid flush too fast
* @return List of files written. Can be empty; must not be null.
*/
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
// Write out the log sequence number that corresponds to this output
// hfile. Also write current time in metadata as minFlushTime.
// The hfile is current up to and including cacheFlushSeqNum.
status.setStatus("Flushing " + store + ": appending metadata");
writer.appendMetadata(cacheFlushSeqNum, false);
status.setStatus("Flushing " + store + ": closing flushed file");
writer.close();
}
/**
* Creates the scanner for flushing snapshot. Also calls coprocessors.
* @param snapshotScanner
* @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
long smallestReadPoint) throws IOException {
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
if (store.getCoprocessorHost() != null) {
try {
return store.getCoprocessorHost().preFlush(store, scanner);
} catch (IOException ioe) {
scanner.close();
throw ioe;
}
}
return scanner;
}
/**
* Performs memstore flush, writing data from scanner into sink.
* @param scanner Scanner to get data from.
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
* @param throughputController A controller to avoid flush too fast
*/
protected void performFlush(InternalScanner scanner, CellSink sink,
long smallestReadPoint, ThroughputController throughputController) throws IOException {
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
List<Cell> kvs = new ArrayList<Cell>();
boolean hasMore;
String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
// no control on system table (such as meta, namespace, etc) flush
boolean control = throughputController != null && !store.getRegionInfo().isSystemTable();
if (control) {
throughputController.start(flushName);
}
try {
do {
hasMore = scanner.next(kvs, scannerContext);
if (!kvs.isEmpty()) {
for (Cell c : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0. This will help us save space when writing to
// disk.
sink.append(c);
int len = KeyValueUtil.length(c);
if (control) {
throughputController.control(flushName, len);
}
}
kvs.clear();
}
} while (hasMore);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while control throughput of flushing "
+ flushName);
} finally {
if (control) {
throughputController.finish(flushName);
}
}
}
}