Skip to content

Commit

Permalink
HBASE-19513 Fix the wrapped AsyncFSOutput implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Dec 15, 2017
1 parent 6ab8ce9 commit 661491b
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 147 deletions.
Expand Up @@ -61,6 +61,11 @@ public interface AsyncFSOutput extends Closeable {
*/
int buffered();

/**
* Whether the stream is broken.
*/
boolean isBroken();

/**
* Return current pipeline. Empty array if no pipeline.
*/
Expand Down
Expand Up @@ -18,26 +18,15 @@
package org.apache.hadoop.hbase.io.asyncfs;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;

Expand All @@ -62,114 +51,23 @@ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrit
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
}
final FSDataOutputStream fsOut;
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
if (createParent) {
fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
} else {
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
}
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
!(CommonFSUtils.hasCapability(out, "hflush") &&
CommonFSUtils.hasCapability(out, "hsync"))) {
out.close();
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
}
final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
return new AsyncFSOutput() {

private final ByteArrayOutputStream out = new ByteArrayOutputStream();

@Override
public void write(byte[] b, int off, int len) {
out.write(b, off, len);
}

@Override
public void write(byte[] b) {
write(b, 0, b.length);
}

@Override
public void writeInt(int i) {
out.writeInt(i);
}

@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}

@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
fsOut.close();
}

@Override
public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}

private void flush0(CompletableFuture<Long> future, boolean sync) {
try {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
} catch (IOException e) {
eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
long pos = fsOut.getPos();
eventLoopGroup.next().execute(() -> future.complete(pos));
} catch (IOException e) {
eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
}
}

@Override
public CompletableFuture<Long> flush(boolean sync) {
CompletableFuture<Long> future = new CompletableFuture<>();
flushExecutor.execute(() -> flush0(future, sync));
return future;
}

@Override
public void close() throws IOException {
try {
flushExecutor.submit(() -> {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
return null;
}).get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e.getCause());
} finally {
flushExecutor.shutdown();
}
fsOut.close();
}

@Override
public int buffered() {
return out.size();
}
};
return new WrapperAsyncFSOutput(f, out);
}
}
Expand Up @@ -380,8 +380,7 @@ public int buffered() {

@Override
public DatanodeInfo[] getPipeline() {
State state = this.state;
return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
return locations;
}

private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
Expand Down Expand Up @@ -569,4 +568,9 @@ public void close() throws IOException {
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}

@Override
public boolean isBroken() {
return state == State.BROKEN;
}
}
@@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}.
*/
@InterfaceAudience.Private
public class WrapperAsyncFSOutput implements AsyncFSOutput {

private final FSDataOutputStream out;

private ByteArrayOutputStream buffer = new ByteArrayOutputStream();

private final ExecutorService executor;

public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build());
}

@Override
public void write(byte[] b) {
write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) {
buffer.write(b, off, len);
}

@Override
public void writeInt(int i) {
buffer.writeInt(i);
}

@Override
public void write(ByteBuffer bb) {
buffer.write(bb, bb.position(), bb.remaining());
}

@Override
public int buffered() {
return buffer.size();
}

@Override
public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}

private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) {
try {
if (buffer.size() > 0) {
out.write(buffer.getBuffer(), 0, buffer.size());
if (sync) {
out.hsync();
} else {
out.hflush();
}
}
future.complete(out.getPos());
} catch (IOException e) {
future.completeExceptionally(e);
return;
}
}

@Override
public CompletableFuture<Long> flush(boolean sync) {
CompletableFuture<Long> future = new CompletableFuture<>();
ByteArrayOutputStream buffer = this.buffer;
this.buffer = new ByteArrayOutputStream();
executor.execute(() -> flush0(future, buffer, sync));
return future;
}

@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
executor.shutdown();
out.close();
}

@Override
public void close() throws IOException {
Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close");
executor.shutdown();
out.close();
}

@Override
public boolean isBroken() {
return false;
}
}
Expand Up @@ -751,6 +751,6 @@ protected boolean doCheckLogLowReplication() {
// not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
// typically there is no 'low replication' state, only a 'broken' state.
AsyncFSOutput output = this.fsOut;
return output != null && output.getPipeline().length == 0;
return output != null && output.isBroken();
}
}
Expand Up @@ -39,6 +39,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
Expand Down Expand Up @@ -102,8 +103,8 @@ private void ensureAllDatanodeAlive() throws InterruptedException {
for (;;) {
try {
FanOutOneBlockAsyncDFSOutput out =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
out.close();
break;
} catch (IOException e) {
Expand All @@ -112,8 +113,7 @@ private void ensureAllDatanodeAlive() throws InterruptedException {
}
}

static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out)
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
throws IOException, InterruptedException, ExecutionException {
List<CompletableFuture<Long>> futures = new ArrayList<>();
byte[] b = new byte[10];
Expand All @@ -130,10 +130,10 @@ static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path
assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
}
out.close();
assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
rand.setSeed(12345);
try (FSDataInputStream in = dfs.open(f)) {
try (FSDataInputStream in = fs.open(f)) {
for (int i = 0; i < 10; i++) {
in.readFully(actual);
rand.nextBytes(b);
Expand All @@ -149,7 +149,7 @@ public void test() throws IOException, InterruptedException, ExecutionException
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(eventLoop, FS, f, out);
writeAndVerify(FS, f, out);
}

@Test
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(eventLoop, FS, f, out);
writeAndVerify(FS, f, out);
}

/**
Expand All @@ -220,7 +220,7 @@ public void testConnectToDatanodeFailed()
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass =
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
Expand Down

0 comments on commit 661491b

Please sign in to comment.