Skip to content

Commit

Permalink
HBASE-16891 Try copying to the Netty ByteBuf directly from the WALEdit
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Oct 29, 2016
1 parent ad0e862 commit 6127753
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 109 deletions.
Expand Up @@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.hbase.io; package org.apache.hadoop.hbase.io;


import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.BufferOverflowException; import java.nio.BufferOverflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
Expand Down Expand Up @@ -50,28 +49,28 @@ public ByteArrayOutputStream(int capacity) {
} }


@Override @Override
public void write(ByteBuffer b, int off, int len) throws IOException { public void write(ByteBuffer b, int off, int len) {
checkSizeAndGrow(len); checkSizeAndGrow(len);
ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len); ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
this.pos += len; this.pos += len;
} }


@Override @Override
public void writeInt(int i) throws IOException { public void writeInt(int i) {
checkSizeAndGrow(Bytes.SIZEOF_INT); checkSizeAndGrow(Bytes.SIZEOF_INT);
Bytes.putInt(this.buf, this.pos, i); Bytes.putInt(this.buf, this.pos, i);
this.pos += Bytes.SIZEOF_INT; this.pos += Bytes.SIZEOF_INT;
} }


@Override @Override
public void write(int b) throws IOException { public void write(int b) {
checkSizeAndGrow(Bytes.SIZEOF_BYTE); checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf[this.pos] = (byte) b; buf[this.pos] = (byte) b;
this.pos++; this.pos++;
} }


@Override @Override
public void write(byte[] b, int off, int len) throws IOException { public void write(byte[] b, int off, int len) {
checkSizeAndGrow(len); checkSizeAndGrow(len);
System.arraycopy(b, off, this.buf, this.pos, len); System.arraycopy(b, off, this.buf, this.pos, len);
this.pos += len; this.pos += len;
Expand Down Expand Up @@ -109,7 +108,7 @@ public void reset() {
* Copies the content of this Stream into a new byte array. * Copies the content of this Stream into a new byte array.
* @return the contents of this output stream, as new byte array. * @return the contents of this output stream, as new byte array.
*/ */
public byte toByteArray()[] { public byte[] toByteArray() {
return Arrays.copyOf(buf, pos); return Arrays.copyOf(buf, pos);
} }


Expand Down
Expand Up @@ -19,6 +19,7 @@


import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;


import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand All @@ -43,6 +44,16 @@ public interface AsyncFSOutput extends Closeable {
*/ */
void write(byte[] b, int off, int len); void write(byte[] b, int off, int len);


/**
* Write an int to the buffer.
*/
void writeInt(int i);

/**
* Copy the data in the given {@code bb} into the buffer.
*/
void write(ByteBuffer bb);

/** /**
* Return the current size of buffered data. * Return the current size of buffered data.
*/ */
Expand Down
Expand Up @@ -22,11 +22,10 @@


import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;


import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler; import java.nio.channels.CompletionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
Expand All @@ -36,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
Expand Down Expand Up @@ -103,68 +103,45 @@ public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0]; return new DatanodeInfo[0];
} }


@Override private <A> void flush0(A attachment, CompletionHandler<Long, ? super A> handler,
public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler, boolean sync) {
final boolean sync) { try {
flushExecutor.execute(new Runnable() { synchronized (out) {

fsOut.write(out.getBuffer(), 0, out.size());
@Override out.reset();
public void run() {
try {
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
} catch (final IOException e) {
eventLoop.execute(new Runnable() {

@Override
public void run() {
handler.failed(e, attachment);
}
});
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(new Runnable() {

@Override
public void run() {
handler.completed(pos, attachment);
}
});
} catch (final IOException e) {
eventLoop.execute(new Runnable() {

@Override
public void run() {
handler.failed(e, attachment);
}
});
}
} }
}); } catch (IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(() -> handler.completed(pos, attachment));
} catch (final IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
}
}

@Override
public <A> void flush(A attachment, CompletionHandler<Long, ? super A> handler,
boolean sync) {
flushExecutor.execute(() -> flush0(attachment, handler, sync));
} }


@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {
flushExecutor.submit(new Callable<Void>() { flushExecutor.submit(() -> {

synchronized (out) {
@Override fsOut.write(out.getBuffer(), 0, out.size());
public Void call() throws Exception { out.reset();
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
return null;
} }
return null;
}).get(); }).get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(); throw new InterruptedIOException();
Expand All @@ -181,6 +158,16 @@ public Void call() throws Exception {
public int buffered() { public int buffered() {
return out.size(); return out.size();
} }

@Override
public void writeInt(int i) {
out.writeInt(i);
}

@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}
}; };
} }
} }
Expand Up @@ -54,7 +54,6 @@
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -349,6 +348,47 @@ private void setupReceiver(int timeoutMs) {
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
} }


private void writeInt0(int i) {
buf.ensureWritable(4);
if (cryptoCodec == null) {
buf.writeInt(i);
} else {
ByteBuffer inBuffer = ByteBuffer.allocate(4);
inBuffer.putInt(0, i);
cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
buf.writerIndex(buf.writerIndex() + 4);
}
}

@Override
public void writeInt(int i) {
if (eventLoop.inEventLoop()) {
writeInt0(i);
} else {
eventLoop.submit(() -> writeInt0(i));
}
}

private void write0(ByteBuffer bb) {
int len = bb.remaining();
buf.ensureWritable(len);
if (cryptoCodec == null) {
buf.writeBytes(bb);
} else {
cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
buf.writerIndex(buf.writerIndex() + len);
}
}

@Override
public void write(ByteBuffer bb) {
if (eventLoop.inEventLoop()) {
write0(bb);
} else {
eventLoop.submit(() -> write0(bb));
}
}

@Override @Override
public void write(byte[] b) { public void write(byte[] b) {
write(b, 0, b.length); write(b, 0, b.length);
Expand All @@ -370,13 +410,7 @@ public void write(final byte[] b, final int off, final int len) {
if (eventLoop.inEventLoop()) { if (eventLoop.inEventLoop()) {
write0(b, off, len); write0(b, off, len);
} else { } else {
eventLoop.submit(new Runnable() { eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();

@Override
public void run() {
write0(b, off, len);
}
}).syncUninterruptibly();
} }
} }


Expand All @@ -385,13 +419,7 @@ public int buffered() {
if (eventLoop.inEventLoop()) { if (eventLoop.inEventLoop()) {
return buf.readableBytes(); return buf.readableBytes();
} else { } else {
return eventLoop.submit(new Callable<Integer>() { return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();

@Override
public Integer call() throws Exception {
return buf.readableBytes();
}
}).syncUninterruptibly().getNow().intValue();
} }
} }


Expand Down

0 comments on commit 6127753

Please sign in to comment.