Skip to content

Commit

Permalink
Fix regression with compressed reader performance
Browse files Browse the repository at this point in the history
due to no pooling and excessive mapping/unmapping

patch by benedict; reviewed by tjake for CASSANDRA-9240
  • Loading branch information
belliottsmith committed May 7, 2015
1 parent ea2ee37 commit aedce5f
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,31 @@
*/
public class CompressedRandomAccessReader extends RandomAccessReader
{
private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap;

public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata)
{
return open(channel, metadata, null);
try
{
return new CompressedRandomAccessReader(channel, metadata, null);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
}
public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata, CompressedPoolingSegmentedFile owner)

public static CompressedRandomAccessReader open(ICompressedFile file)
{
try
{
return new CompressedRandomAccessReader(channel, metadata, owner);
return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
}


private TreeMap<Long, MappedByteBuffer> chunkSegments;
private int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
private final TreeMap<Long, MappedByteBuffer> chunkSegments;

private final CompressionMetadata metadata;

Expand All @@ -75,61 +79,24 @@ public static CompressedRandomAccessReader open(ChannelProxy channel, Compressio
// raw checksum bytes
private ByteBuffer checksumBytes;

protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file) throws FileNotFoundException
{
super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), owner);
super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().useDirectOutputByteBuffers(), file instanceof PoolingSegmentedFile ? (PoolingSegmentedFile) file : null);
this.metadata = metadata;
checksum = new Adler32();

if (!useMmap)
chunkSegments = file == null ? null : file.chunkSegments();
if (chunkSegments == null)
{
compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
compressed = super.allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().useDirectOutputByteBuffers());
checksumBytes = ByteBuffer.wrap(new byte[4]);
}
else
{
try
{
createMappedSegments();
}
catch (IOException e)
{
throw new IOError(e);
}
}
}

private void createMappedSegments() throws IOException
{
chunkSegments = new TreeMap<>();
long offset = 0;
long lastSegmentOffset = 0;
long segmentSize = 0;

while (offset < metadata.dataLength)
{
CompressionMetadata.Chunk chunk = metadata.chunkFor(offset);

//Reached a new mmap boundary
if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE)
{
chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
lastSegmentOffset += segmentSize;
segmentSize = 0;
}

segmentSize += chunk.length + 4; //checksum
offset += metadata.chunkLength();
}

if (segmentSize > 0)
chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize));
}

protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
{
assert Integer.bitCount(bufferSize) == 1;
return useMmap && useDirect
return useDirect
? ByteBuffer.allocateDirect(bufferSize)
: ByteBuffer.allocate(bufferSize);
}
Expand All @@ -138,16 +105,9 @@ protected ByteBuffer allocateBuffer(int bufferSize, boolean useDirect)
public void deallocate()
{
super.deallocate();

if (chunkSegments != null)
{
for (Map.Entry<Long, MappedByteBuffer> entry : chunkSegments.entrySet())
{
FileUtils.clean(entry.getValue());
}
}

chunkSegments = null;
if (compressed != null)
FileUtils.clean(compressed);
compressed = null;
}

private void reBufferStandard()
Expand Down Expand Up @@ -175,7 +135,7 @@ private void reBufferStandard()
int decompressedBytes;
try
{
decompressedBytes = metadata.compressor().uncompress(compressed.array(), 0, chunk.length, buffer.array(), 0);
decompressedBytes = metadata.compressor().uncompress(compressed, buffer);
buffer.limit(decompressedBytes);
}
catch (IOException e)
Expand All @@ -186,8 +146,8 @@ private void reBufferStandard()

if (metadata.parameters.getCrcCheckChance() > ThreadLocalRandom.current().nextDouble())
{

checksum.update(compressed.array(), 0, chunk.length);
compressed.position(0);
FBUtilities.directCheckSum(checksum, compressed);

if (checksum(chunk) != (int) checksum.getValue())
throw new CorruptBlockException(getPath(), chunk);
Expand Down Expand Up @@ -226,7 +186,7 @@ private void reBufferMmap()
Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
long segmentOffset = entry.getKey();
int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
MappedByteBuffer compressedChunk = entry.getValue();
ByteBuffer compressedChunk = entry.getValue().duplicate();

compressedChunk.position(chunkOffset);
compressedChunk.limit(chunkOffset + chunk.length);
Expand Down Expand Up @@ -284,7 +244,7 @@ private void reBufferMmap()
@Override
protected void reBuffer()
{
if (useMmap)
if (chunkSegments != null)
{
reBufferMmap();
}
Expand All @@ -305,7 +265,7 @@ private int checksum(CompressionMetadata.Chunk chunk) throws IOException

public int getTotalBufferSize()
{
return super.getTotalBufferSize() + (useMmap ? 0 : compressed.capacity());
return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.ICompressedFile;

public class CompressedThrottledReader extends CompressedRandomAccessReader
{
private final RateLimiter limiter;

public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter) throws FileNotFoundException
public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter) throws FileNotFoundException
{
super(channel, metadata, null);
super(channel, metadata, file);
this.limiter = limiter;
}

Expand All @@ -43,11 +44,11 @@ protected void reBuffer()
super.reBuffer();
}

public static CompressedThrottledReader open(ChannelProxy channel, CompressionMetadata metadata, RateLimiter limiter)
public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
{
try
{
return new CompressedThrottledReader(channel, metadata, limiter);
return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
}
catch (FileNotFoundException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,14 @@ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] out
}
}

public int uncompress(ByteBuffer input_, ByteBuffer output) throws IOException
public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
if (!output.hasArray())
throw new IllegalArgumentException("DeflateCompressor doesn't work with direct byte buffers");

byte[] input = ByteBufferUtil.getArray(input_);
return uncompress(input, 0, input.length, output.array(), output.arrayOffset() + output.position());
if (input.hasArray())
return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
return uncompress(ByteBufferUtil.getArray(input), 0, input.remaining(), output.array(), output.arrayOffset() + output.position());
}

public boolean useDirectOutputByteBuffers()
Expand Down
6 changes: 5 additions & 1 deletion src/java/org/apache/cassandra/io/compress/LZ4Compressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] out
| ((input[inputOffset + 1] & 0xFF) << 8)
| ((input[inputOffset + 2] & 0xFF) << 16)
| ((input[inputOffset + 3] & 0xFF) << 24);

final int compressedLength;
try
{
Expand All @@ -104,6 +105,9 @@ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] out

public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
if (input.hasArray() && output.hasArray())
return uncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());

int pos = input.position();
final int decompressedLength = (input.get(pos) & 0xFF)
| ((input.get(pos + 1) & 0xFF) << 8)
Expand Down Expand Up @@ -132,7 +136,7 @@ public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
@Override
public boolean useDirectOutputByteBuffers()
{
return false;
return true;
}

public Set<String> supportedOptions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] out

public int uncompress(ByteBuffer input, ByteBuffer output) throws IOException
{
if (input.hasArray() && output.hasArray())
return Snappy.rawUncompress(input.array(), input.arrayOffset() + input.position(), input.remaining(), output.array(), output.arrayOffset() + output.position());
return Snappy.uncompress(input, output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.cassandra.io.util;

import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.TreeMap;

import com.google.common.util.concurrent.RateLimiter;

import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
Expand All @@ -27,31 +31,56 @@
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
public final CompressionMetadata metadata;
private final TreeMap<Long, MappedByteBuffer> chunkSegments;

public CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata)
{
super(new Cleanup(channel, metadata), channel, metadata.dataLength, metadata.compressedFileLength);
this(channel, metadata, CompressedSegmentedFile.createMappedSegments(channel, metadata));
}

private CompressedPoolingSegmentedFile(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
{
super(new Cleanup(channel, metadata, chunkSegments), channel, metadata.dataLength, metadata.compressedFileLength);
this.metadata = metadata;
this.chunkSegments = chunkSegments;
}

private CompressedPoolingSegmentedFile(CompressedPoolingSegmentedFile copy)
{
super(copy);
this.metadata = copy.metadata;
this.chunkSegments = copy.chunkSegments;
}

public ChannelProxy channel()
{
return channel;
}

public TreeMap<Long, MappedByteBuffer> chunkSegments()
{
return chunkSegments;
}

protected static final class Cleanup extends PoolingSegmentedFile.Cleanup
{
final CompressionMetadata metadata;
protected Cleanup(ChannelProxy channel, CompressionMetadata metadata)
final TreeMap<Long, MappedByteBuffer> chunkSegments;
protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap<Long, MappedByteBuffer> chunkSegments)
{
super(channel);
this.metadata = metadata;
this.chunkSegments = chunkSegments;
}
public void tidy()
{
super.tidy();
metadata.close();
if (chunkSegments != null)
{
for (MappedByteBuffer segment : chunkSegments.values())
FileUtils.clean(segment);
}
}
}

Expand Down Expand Up @@ -82,17 +111,17 @@ public void dropPageCache(long before)

public RandomAccessReader createReader()
{
return CompressedRandomAccessReader.open(channel, metadata, null);
return CompressedRandomAccessReader.open(this);
}

public RandomAccessReader createThrottledReader(RateLimiter limiter)
{
return CompressedThrottledReader.open(channel, metadata, limiter);
return CompressedThrottledReader.open(this, limiter);
}

protected RandomAccessReader createPooledReader()
{
return CompressedRandomAccessReader.open(channel, metadata, this);
return CompressedRandomAccessReader.open(this);
}

public CompressionMetadata getMetadata()
Expand Down
Loading

0 comments on commit aedce5f

Please sign in to comment.