-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
WALEntryStream.java
429 lines (396 loc) · 14.2 KB
/
WALEntryStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
* iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
* dequeues it and starts reading from the next.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALEntryStream implements Iterator<Entry>, Closeable, Iterable<Entry> {
private static final Log LOG = LogFactory.getLog(WALEntryStream.class);
private Reader reader;
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
// position after reading current entry
private long currentPosition = 0;
private final ReplicationSourceLogQueue logQueue;
private final String walGroupId;
private FileSystem fs;
private Configuration conf;
private MetricsSource metrics;
/**
* Create an entry stream over the given queue
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param metrics replication metrics
* @param walGroupId wal prefix
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
MetricsSource metrics, String walGroupId) {
this(logQueue, fs, conf, 0, metrics, walGroupId);
}
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf the {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param metrics the replication metrics
* @param walGroupId wal prefix
*/
public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
long startPosition, MetricsSource metrics, String walGroupId) {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.metrics = metrics;
this.walGroupId = walGroupId;
}
/**
* @return true if there is another WAL {@link Entry}
* @throws WALEntryStreamRuntimeException if there was an Exception while reading
*/
@Override
public boolean hasNext() {
if (currentEntry == null) {
try {
tryAdvanceEntry();
} catch (Exception e) {
throw new WALEntryStreamRuntimeException(e);
}
}
return currentEntry != null;
}
/**
* @return the next WAL entry in this stream
* @throws WALEntryStreamRuntimeException if there was an IOException
* @throws NoSuchElementException if no more entries in the stream.
*/
@Override
public Entry next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save;
}
/**
* Not supported.
*/
@Override
public void remove() {
throw new UnsupportedOperationException();
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException {
closeReader();
}
/**
* @return the iterator over WAL entries in the queue.
*/
@Override
public Iterator<Entry> iterator() {
return this;
}
/**
* @return the position of the last Entry returned by next()
*/
public long getPosition() {
return currentPosition;
}
/**
* @return the {@link Path} of the current WAL
*/
public Path getCurrentPath() {
return currentPath;
}
private String getCurrentPathStat() {
StringBuilder sb = new StringBuilder();
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(currentPosition).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
return sb.toString();
}
/**
* Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
* false)
* @throws IOException io exception while resetting the reader
*/
public void reset() throws IOException {
if (reader != null && currentPath != null) {
resetReader();
}
}
private void setPosition(long position) {
currentPosition = position;
}
private void setCurrentPath(Path path) {
this.currentPath = path;
}
private void tryAdvanceEntry() throws IOException {
if (checkReader()) {
readNextEntryAndSetPosition();
if (currentEntry == null) { // no more entries in this log file - see if log was rolled
if (logQueue.getQueue(walGroupId).size() > 1) { // log was rolled
// Before dequeueing, we should always get one more attempt at reading.
// This is in case more entries came in after we opened the reader,
// and a new log was enqueued while we were reading. See HBASE-6758
resetReader();
readNextEntryAndSetPosition();
if (currentEntry == null) {
if (checkAllBytesParsed()) { // now we're certain we're done with this log file
dequeueCurrentLog();
if (openNextLog()) {
readNextEntryAndSetPosition();
}
}
}
} // no other logs, we've simply hit the end of the current open log. Do nothing
}
}
// do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
// HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
private boolean checkAllBytesParsed() throws IOException {
// -1 means the wal wasn't closed cleanly.
final long trailerSize = currentTrailerSize();
FileStatus stat = null;
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it "
+ (trailerSize < 0 ? "was not" : "was") + " closed cleanly " + getCurrentPathStat());
metrics.incrUnknownFileLengthForClosedWAL();
}
if (stat != null) {
if (trailerSize < 0) {
if (currentPosition < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPosition;
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of WAL file '" + currentPath
+ "'. It was not closed cleanly, so we did not parse " + skippedBytes
+ " bytes of data. This is normally ok.");
}
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
} else if (currentPosition + trailerSize < stat.getLen()) {
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition
+ ", which is too far away from reported file length " + stat.getLen()
+ ". Restarting WAL reading (see HBASE-15983 for details). " + getCurrentPathStat());
setPosition(0);
resetReader();
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPosition);
return false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ (stat == null ? "N/A" : stat.getLen()));
}
metrics.incrCompletedWAL();
return true;
}
private void dequeueCurrentLog() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Reached the end of log " + currentPath);
}
closeReader();
logQueue.remove(walGroupId);
setCurrentPath(null);
setPosition(0);
}
private void readNextEntryAndSetPosition() throws IOException {
Entry readEntry = reader.next();
long readerPos = reader.getPosition();
if (readEntry != null) {
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPosition);
}
currentEntry = readEntry; // could be null
setPosition(readerPos);
}
private void closeReader() throws IOException {
if (reader != null) {
reader.close();
reader = null;
}
}
// if we don't have a reader, open a reader on the next log
private boolean checkReader() throws IOException {
if (reader == null) {
return openNextLog();
}
return true;
}
// open a reader on the next log in queue
private boolean openNextLog() throws IOException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
Path nextPath = queue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) {
return true;
}
}
return false;
}
Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
}
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(path);
if (!path.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
}
private void openReader(Path path) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else
// reset the current reader so that we see the new data
if (reader == null || !getCurrentPath().equals(path)) {
closeReader();
reader = WALFactory.createReader(fs, path, conf);
seek();
setCurrentPath(path);
} else {
resetReader();
}
} catch (FileNotFoundException fnfe) {
handleFileNotFound(path, fnfe);
} catch (RemoteException re) {
IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
if (!(ioe instanceof FileNotFoundException)) {
throw ioe;
}
handleFileNotFound(path, (FileNotFoundException)ioe);
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn("Try to recover the WAL lease " + path, lnre);
recoverLease(conf, path);
reader = null;
} catch (NullPointerException npe) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
reader = null;
}
}
// For HBASE-15019
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return true;
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
private void resetReader() throws IOException {
try {
reader.reset();
seek();
} catch (FileNotFoundException fnfe) {
// If the log was archived, continue reading from there
Path archivedLog = getArchivedLog(currentPath);
if (!currentPath.equals(archivedLog)) {
openReader(archivedLog);
} else {
throw fnfe;
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
}
private void seek() throws IOException {
if (currentPosition != 0) {
reader.seek(currentPosition);
}
}
private long currentTrailerSize() {
long size = -1L;
if (reader instanceof ProtobufLogReader) {
final ProtobufLogReader pblr = (ProtobufLogReader) reader;
size = pblr.trailerSize();
}
return size;
}
@InterfaceAudience.Private
public static class WALEntryStreamRuntimeException extends RuntimeException {
private static final long serialVersionUID = -6298201811259982568L;
public WALEntryStreamRuntimeException(Exception e) {
super(e);
}
}
}