Skip to content

Commit

Permalink
Optimize FileWriteOutBytes to avoid high system cpu usage (#9722) (#9770
Browse files Browse the repository at this point in the history
)

* optimize FileWriteOutBytes to avoid high sys cpu

* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException

* optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size

* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size"

This reverts commit 965f742

* Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException"

This reverts commit 149e08c

* optimize FileWriteOutBytes to avoid high sys cpu -- avoid IOEception never thrown check

* Fix size counting to handle IOE in FileWriteOutBytes + tests

* remove unused throws IOException in WriteOutBytes.size()

* Remove redundant throws IOExcpetion clauses

* Parameterize IndexMergeBenchmark

Co-authored-by: huanghui.bigrey <huanghui.bigrey@bytedance.com>
Co-authored-by: Suneet Saldanha <suneet.saldanha@imply.io>

Co-authored-by: BIGrey <huanghui0143@163.com>
Co-authored-by: huanghui.bigrey <huanghui.bigrey@bytedance.com>
  • Loading branch information
3 people committed Apr 24, 2020
1 parent e85d9d9 commit 29073c5
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -66,6 +68,7 @@
@Measurement(iterations = 25)
public class IndexMergeBenchmark
{

@Param({"5"})
private int numSegments;

Expand All @@ -78,9 +81,13 @@ public class IndexMergeBenchmark
@Param({"true", "false"})
private boolean rollup;

@Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"})
private SegmentWriteOutType factoryType;


private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;

private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;

Expand All @@ -91,6 +98,7 @@ public class IndexMergeBenchmark
private List<QueryableIndex> indexesToMerge;
private BenchmarkSchemaInfo schemaInfo;
private File tmpDir;
private IndexMergerV9 indexMergerV9;

static {
JSON_MAPPER = new DefaultObjectMapper();
Expand All @@ -99,23 +107,16 @@ public class IndexMergeBenchmark
JSON_MAPPER.setInjectableValues(injectableValues);
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
() -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}

@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());

log.info("SETUP CALLED AT " + System.currentTimeMillis());
indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType));
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());

indexesToMerge = new ArrayList<>();
Expand Down Expand Up @@ -143,7 +144,7 @@ public void setup() throws IOException
tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());

File indexFile = INDEX_MERGER_V9.persist(
File indexFile = indexMergerV9.persist(
incIndex,
tmpDir,
new IndexSpec(),
Expand All @@ -155,26 +156,6 @@ public void setup() throws IOException
}
}

@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand All @@ -186,7 +167,7 @@ public void mergeV9(Blackhole blackhole) throws Exception
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());

File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(
File mergedFile = indexMergerV9.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
Expand All @@ -199,8 +180,46 @@ public void mergeV9(Blackhole blackhole) throws Exception
}
finally {
tmpFile.delete();
}
}

@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}

public enum SegmentWriteOutType
{
TMP_FILE,
OFF_HEAP,
ON_HEAP
}

private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type)
{
switch (type) {
case TMP_FILE:
return TmpFileSegmentWriteOutMediumFactory.instance();
case OFF_HEAP:
return OffHeapMemorySegmentWriteOutMediumFactory.instance();
case ON_HEAP:
return OnHeapMemorySegmentWriteOutMediumFactory.instance();
}
throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type);
}

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void write(T objectToWrite) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return headerOut.size() + valueOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void add(double value) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void add(float value) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private long getOffset(int index) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
if (requireMultipleFiles) {
// for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file.
Expand Down Expand Up @@ -394,7 +394,7 @@ private void writeToMultiFiles(WritableByteChannel channel, FileSmoosher smooshe
*
* @throws IOException
*/
private int bagSizePower() throws IOException
private int bagSizePower()
{
long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten;

Expand All @@ -421,7 +421,7 @@ private int bagSizePower() throws IOException
*
* @throws IOException
*/
private boolean actuallyFits(int powerTwo) throws IOException
private boolean actuallyFits(int powerTwo)
{
long lastValueOffset = 0;
long currentValueOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void serialize(ColumnValueSelector<? extends T> selector) throws IOExcept
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void serialize(ColumnValueSelector<? extends T> selector) throws IOExcept
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ public int size(T x)

public interface FieldWriter<T>
{
void writeTo(ByteBuffer buffer, T x) throws IOException;
void writeTo(ByteBuffer buffer, T x);

int size(T x);
}

@FunctionalInterface
public interface IntFieldWriter<T> extends FieldWriter<T>
{
int getField(T x) throws IOException;
int getField(T x);

@Override
default void writeTo(ByteBuffer buffer, T x) throws IOException
default void writeTo(ByteBuffer buffer, T x)
{
buffer.putInt(getField(x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
private final File file;
private final FileChannel ch;
private long writeOutBytes;

/** Purposely big-endian, for {@link #writeInt(int)} implementation */
private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer
Expand All @@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
this.file = file;
this.ch = ch;
this.writeOutBytes = 0L;
}

private void flushIfNeeded(int bytesNeeded) throws IOException
Expand All @@ -66,13 +68,15 @@ public void write(int b) throws IOException
{
flushIfNeeded(1);
buffer.put((byte) b);
writeOutBytes++;
}

@Override
public void writeInt(int v) throws IOException
{
flushIfNeeded(Integer.BYTES);
buffer.putInt(v);
writeOutBytes += Integer.BYTES;
}

@Override
Expand All @@ -85,14 +89,17 @@ public int write(ByteBuffer src) throws IOException
try {
src.limit(src.position() + buffer.capacity());
buffer.put(src);
writeOutBytes += buffer.capacity();
flush();
}
finally {
// IOException may occur in flush(), reset src limit to the original
src.limit(srcLimit);
}
}
int remaining = src.remaining();
buffer.put(src);
writeOutBytes += remaining;
return len;
}

Expand All @@ -103,10 +110,9 @@ public void write(byte[] b, int off, int len) throws IOException
}

@Override
public long size() throws IOException
public long size()
{
flush();
return ch.size();
return writeOutBytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte
/**
* Returns the number of bytes written to this WriteOutBytes so far.
*/
public abstract long size() throws IOException;
public abstract long size();

/**
* Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel.
Expand Down
Loading

0 comments on commit 29073c5

Please sign in to comment.