Skip to content

Commit

Permalink
ARROW-7491: [Java] Improve the performance of aligning
Browse files Browse the repository at this point in the history
Aligning is an important and frequent operation when writing IPC data. It writes no more than 7 0 bytes to the output.
The current implementation creates a new byte array each time, leading to performance overhead, and increases the GC pressure.

We improve it by means of a shared byte array. Benchmark evaluation shows a 11% performance gain.

Closes #6120 from liyafan82/fly_1230_ali and squashes the following commits:

d337a2d <liyafan82>  Make buffer array members final
9bd08b9 <liyafan82>  Improve writeZeros method
3db4eaf <liyafan82>  Avoid creating byte arrays repeatedly when writing integers
cc241fb <liyafan82>  Improve the performance of aligning

Authored-by: liyafan82 <fan_li_ya@foxmail.com>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
liyafan82 authored and wesm committed Feb 21, 2020
1 parent acfcc5b commit 9d109f4
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.vector.ipc;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
* Benchmarks for {@link WriteChannel}.
*/
public class WriteChannelBenchmark {

/**
* State object for align benchmark.
*/
@State(Scope.Benchmark)
public static class AlignState {

private ByteArrayOutputStream baos;

private WriteChannel writeChannel;

@Param({"1", "2", "3", "4", "5", "6", "7"})
public int alignSize;

@Setup(Level.Invocation)
public void prepareInvoke() throws IOException {
baos = new ByteArrayOutputStream(8);
writeChannel = new WriteChannel(Channels.newChannel(baos));
writeChannel.write(new byte[8 - alignSize]);
}

@TearDown(Level.Invocation)
public void tearDownInvoke() throws IOException {
writeChannel.close();
baos.close();
}
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public void alignBenchmark(AlignState state) throws IOException {
state.writeChannel.align();
}

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(WriteChannelBenchmark.class.getSimpleName())
.forks(1)
.build();

new Runner(opt).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,18 @@
* <p>All write methods in this class follow full write semantics, i.e., write calls
* only return after requested data has been fully written. Note this is different
* from java WritableByteChannel interface where partial write is allowed
* </p>
* <p>
* Please note that objects of this class are not thread-safe.
* </p>
*/
public class WriteChannel implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(WriteChannel.class);

private static final byte[] ZERO_BYTES = new byte[8];

private final byte[] intBuf = new byte[4];

private long currentPosition = 0;

private final WritableByteChannel out;
Expand All @@ -62,15 +70,22 @@ public long write(byte[] buffer) throws IOException {
return write(ByteBuffer.wrap(buffer));
}

long write(byte[] buffer, int offset, int length) throws IOException {
return write(ByteBuffer.wrap(buffer, offset, length));
}

/**
* Writes <zeroCount>zeroCount</zeroCount> zeros the underlying channel.
*/
public long writeZeros(long zeroCount) throws IOException {
long bytesWritten = 0;
while (zeroCount > 0) {
int bytesToWrite = (int)Math.min(zeroCount, Integer.MAX_VALUE);
bytesWritten += write(new byte[bytesToWrite]);
zeroCount -= Integer.MAX_VALUE;
long wholeWordsEnd = zeroCount - 8;
while (bytesWritten <= wholeWordsEnd) {
bytesWritten += write(ZERO_BYTES);
}

if (bytesWritten < zeroCount) {
bytesWritten += write(ZERO_BYTES, 0, (int) (zeroCount - bytesWritten));
}
return bytesWritten;
}
Expand All @@ -79,8 +94,9 @@ public long writeZeros(long zeroCount) throws IOException {
* Writes enough bytes to align the channel to an 8-byte boundary.
*/
public long align() throws IOException {
if (currentPosition % 8 != 0) { // align on 8 byte boundaries
return writeZeros(8 - (int) (currentPosition % 8));
int trailingByteSize = (int) (currentPosition % 8);
if (trailingByteSize != 0) { // align on 8 byte boundaries
return writeZeros(8 - trailingByteSize);
}
return 0;
}
Expand All @@ -90,7 +106,9 @@ public long align() throws IOException {
*/
public long write(ByteBuffer buffer) throws IOException {
long length = buffer.remaining();
LOGGER.debug("Writing buffer with size: {}", length);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Writing buffer with size: {}", length);
}
while (buffer.hasRemaining()) {
out.write(buffer);
}
Expand All @@ -102,9 +120,8 @@ public long write(ByteBuffer buffer) throws IOException {
* Writes <code>v</code> in little-endian format to the underlying channel.
*/
public long writeIntLittleEndian(int v) throws IOException {
byte[] outBuffer = new byte[4];
MessageSerializer.intToBytes(v, outBuffer);
return write(outBuffer);
MessageSerializer.intToBytes(v, intBuf);
return write(intBuf);
}

/**
Expand Down

0 comments on commit 9d109f4

Please sign in to comment.