Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public Reader createReader(final FileSystem fs, final Path path,
reader.init(fs, path, conf, stream);
return reader;
}
} catch (IOException e) {
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching Exception is enough? No need to catch Throwable to avoid any possibility of still getting this leak?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching throwable is usually a smell as it also encompasses java.lang.Error's. We definitely don't want to catch Error's as we want to let them propagate as quickly as possible.

if (stream != null) {
try {
stream.close();
Expand All @@ -328,33 +328,39 @@ public Reader createReader(final FileSystem fs, final Path path,
LOG.debug("exception details", exception);
}
}
String msg = e.getMessage();
if (msg != null && (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block")
|| msg.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
if (e instanceof IOException) {
String msg = e.getMessage();
if (msg != null && (msg.contains("Cannot obtain block length")
|| msg.contains("Could not obtain the last block")
|| msg.matches("Blocklist for [^ ]* has changed.*"))) {
if (++nbAttempt == 1) {
LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
}
if (reporter != null && !reporter.progress()) {
throw new InterruptedIOException("Operation is cancelled");
}
if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
LOG.error("Can't open after " + nbAttempt + " attempts and "
+ (EnvironmentEdgeManager.currentTime() - startWaiting)
+ "ms " + " for " + path);
} else {
try {
Thread.sleep(nbAttempt < 3 ? 500 : 1000);
continue; // retry
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
throw new LeaseNotRecoveredException(e);
} else {
throw e;
}
throw new LeaseNotRecoveredException(e);
} else {
throw e;
}

// Rethrow the original exception if we are not retrying due to HDFS-isms.
throw e;
}
}
} catch (IOException ie) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

/**
* Create a non-abstract "proxy" for FileSystem because FileSystem is an
* abstract class and not an interface. Only interfaces can be used with the
* Java Proxy class to override functionality via an InvocationHandler.
*
*/
public class FileSystemProxy extends FileSystem {
private final FileSystem real;

public FileSystemProxy(FileSystem real) {
this.real = real;
}

@Override
public FSDataInputStream open(Path p) throws IOException {
return real.open(p);
}

@Override
public URI getUri() {
return real.getUri();
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return real.open(f, bufferSize);
}

@Override
public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite,
int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
}

@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
return real.append(f, bufferSize, progress);
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
return real.rename(src, dst);
}

@Override
public boolean delete(Path f, boolean recursive) throws IOException {
return real.delete(f, recursive);
}

@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
return real.listStatus(f);
}

@Override
public void setWorkingDirectory(Path new_dir) {
real.setWorkingDirectory(new_dir);
}

@Override
public Path getWorkingDirectory() {
return real.getWorkingDirectory();
}

@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return real.mkdirs(f, permission);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
return real.getFileStatus(f);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -48,13 +53,16 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
Expand Down Expand Up @@ -714,6 +722,135 @@ public void testReadLegacyLog() throws IOException {
}
}

@Test
public void testReaderClosedOnBadCodec() throws IOException {
// Create our own Configuration and WALFactory to avoid breaking other test methods
Configuration confWithCodec = new Configuration(conf);
confWithCodec.setClass(
WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class);
WALFactory customFactory = new WALFactory(confWithCodec, null, currentTest.getMethodName());

// Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
// the FileSystem and know if close() was called on those InputStreams.
final List<InputStreamProxy> openedReaders = new ArrayList<>();
FileSystemProxy proxyFs = new FileSystemProxy(fs) {
@Override
public FSDataInputStream open(Path p) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p));
openedReaders.add(is);
return is;
}

@Override
public FSDataInputStream open(Path p, int blockSize) throws IOException {
InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
openedReaders.add(is);
return is;
}
};

final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf(currentTest.getMethodName()));
htd.addFamily(new HColumnDescriptor(Bytes.toBytes("column")));

HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);

NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor colDesc : htd.getColumnFamilies()) {
scopes.put(colDesc.getName(), 0);
}
byte[] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
try {
// Write one column in one edit.
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
final WAL log = customFactory.getWAL(
hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
final long txid = log.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc),
cols, true);
// Sync the edit to the WAL
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.shutdown();

// Inject our failure, object is constructed via reflection.
BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);

// Now open a reader on the log which will throw an exception when
// we try to instantiate the custom Codec.
Path filename = DefaultWALProvider.getCurrentFileName(log);
try {
reader = customFactory.createReader(proxyFs, filename);
fail("Expected to see an exception when creating WAL reader");
} catch (Exception e) {
// Expected that we get an exception
}
// We should have exactly one reader
assertEquals(1, openedReaders.size());
// And that reader should be closed.
int numNotClosed = 0;
for (InputStreamProxy openedReader : openedReaders) {
if (!openedReader.isClosed.get()) {
numNotClosed++;
}
}
assertEquals("Should not find any open readers", 0, numNotClosed);
} finally {
if (reader != null) {
reader.close();
}
}
}

/**
* A proxy around FSDataInputStream which can report if close() was called.
*/
private static class InputStreamProxy extends FSDataInputStream {
private final InputStream real;
private final AtomicBoolean isClosed = new AtomicBoolean(false);

public InputStreamProxy(InputStream real) {
super(real);
this.real = real;
}

@Override
public void close() throws IOException {
isClosed.set(true);
real.close();
}
}

/**
* A custom WALCellCodec in which we can inject failure.
*/
public static class BrokenWALCellCodec extends WALCellCodec {
static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);

static void maybeInjectFailure() {
if (THROW_FAILURE_ON_INIT.get()) {
throw new RuntimeException("Injected instantiation exception");
}
}

public BrokenWALCellCodec() {
super();
maybeInjectFailure();
}

public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
super(conf, compression);
maybeInjectFailure();
}
}

static class DumbWALActionsListener extends WALActionsListener.Base {
int increments = 0;

Expand Down