Skip to content
This repository has been archived by the owner on Apr 5, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' into BRISK-230
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Jun 27, 2011
2 parents fac71c7 + 36ad2a5 commit fb7f0a2
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 194 deletions.
88 changes: 40 additions & 48 deletions src/java/src/com/hadoop/compression/snappy/SnappyCompressor.java
Expand Up @@ -15,21 +15,21 @@ public class SnappyCompressor implements Compressor
{
private static final Log logger = LogFactory.getLog(SnappyCompressor.class.getName());


private boolean finish, finished;
private ByteBuffer outBuf;
private ByteBuffer compressedBuf;

private long bytesRead = 0L;
private long bytesWritten = 0L;


public SnappyCompressor(int bufferSize)
{
outBuf = ByteBuffer.allocateDirect(bufferSize);
compressedBuf = ByteBuffer.allocateDirect(Snappy.maxCompressedLength(bufferSize));
reset();
outBuf = ByteBuffer.allocateDirect(bufferSize);
compressedBuf = ByteBuffer.allocateDirect(Snappy.maxCompressedLength(bufferSize));

reset();
}

public synchronized void setInput(byte[] b, int off, int len)
Expand All @@ -43,24 +43,24 @@ public synchronized void setInput(byte[] b, int off, int len)
throw new ArrayIndexOutOfBoundsException();
}
finished = false;

outBuf.put(b, off, len);

bytesRead += len;
}
}

public synchronized void setDictionary(byte[] b, int off, int len)
{
// do nothing
}

public synchronized boolean needsInput()
{
{
// needs input if compressed data was consumed
if (compressedBuf.position() > 0 && compressedBuf.limit() > compressedBuf.position())
if (compressedBuf.position() > 0 && compressedBuf.limit() > compressedBuf.position())
return false;
return true;

return true;
}

public synchronized void finish()
Expand All @@ -70,13 +70,13 @@ public synchronized void finish()

public synchronized boolean finished()
{
// Check if all compressed data has been consumed
// Check if all compressed data has been consumed
return (finish && finished);
}

public synchronized int compress(byte[] b, int off, int len) throws IOException
{

if (b == null)
{
throw new NullPointerException();
Expand All @@ -85,58 +85,50 @@ public synchronized int compress(byte[] b, int off, int len) throws IOException
{
throw new ArrayIndexOutOfBoundsException();
}

if(finished || outBuf.position() == 0)
{
finished = true;
return 0;
}


//Only need todo this once
if(compressedBuf.position() == 0)
{
try
{
outBuf.limit(outBuf.position());
outBuf.rewind();

int lim = Snappy.compress(outBuf, compressedBuf);

compressedBuf.limit(lim);
compressedBuf.rewind();
}
catch (SnappyException e)
{
throw new IOException(e);
}
outBuf.limit(outBuf.position());
outBuf.rewind();

int lim = Snappy.compress(outBuf, compressedBuf);

compressedBuf.limit(lim);
compressedBuf.rewind();
}
int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len : (compressedBuf.limit() - compressedBuf.position());


int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len : (compressedBuf.limit() - compressedBuf.position());

if(n == 0)
{
finished = true;
return 0;
}

compressedBuf.get(b, off, n);

bytesWritten += n;
// Set 'finished' if snappy has consumed all user-data

// Set 'finished' if snappy has consumed all user-data
if (compressedBuf.position() == compressedBuf.limit())
{
finished = true;

outBuf.limit(outBuf.capacity());
outBuf.rewind();

compressedBuf.limit(compressedBuf.capacity());
compressedBuf.rewind();

}
}

return n;
}
Expand All @@ -145,13 +137,13 @@ public synchronized void reset()
{
finish = false;
finished = false;

outBuf.limit(outBuf.capacity());
outBuf.rewind();

compressedBuf.limit(compressedBuf.capacity());
compressedBuf.rewind();

bytesRead = bytesWritten = 0L;
}

Expand Down
91 changes: 42 additions & 49 deletions src/java/src/com/hadoop/compression/snappy/SnappyDecompressor.java
Expand Up @@ -14,24 +14,24 @@ public class SnappyDecompressor implements Decompressor
{

private static final Log logger = LogFactory.getLog(SnappyDecompressor.class.getName());

private boolean finished;
private ByteBuffer outBuf;
private ByteBuffer uncompressedBuf;

private long bytesRead = 0L;
private long bytesWritten = 0L;


public SnappyDecompressor(int bufferSize)
{
outBuf = ByteBuffer.allocateDirect(bufferSize);
uncompressedBuf = ByteBuffer.allocateDirect(bufferSize);

reset();
}


public synchronized void setInput(byte[] b, int off, int len)
{
if (b == null)
Expand All @@ -42,11 +42,11 @@ public synchronized void setInput(byte[] b, int off, int len)
{
throw new ArrayIndexOutOfBoundsException();
}
finished = false;

finished = false;

outBuf.put(b, off, len);

bytesRead += len;
}

Expand All @@ -56,11 +56,11 @@ public synchronized void setDictionary(byte[] b, int off, int len)
}

public synchronized boolean needsInput()
{
{
//needs input if the uncompressed data was consumed
if (uncompressedBuf.position() > 0 && uncompressedBuf.limit() > uncompressedBuf.position())
return false;

return true;
}

Expand All @@ -70,14 +70,14 @@ public synchronized boolean needsDictionary()
}

public synchronized boolean finished()
{
{
return finished;
}

public synchronized int decompress(byte[] b, int off, int len) throws IOException
{


if (b == null)
{
throw new NullPointerException();
Expand All @@ -86,62 +86,55 @@ public synchronized int decompress(byte[] b, int off, int len) throws IOExceptio
{
throw new ArrayIndexOutOfBoundsException();
}

//nothing to decompress
if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished)
{
reset();
finished = true;

return 0;
}

//only needs to do this once per input
if(uncompressedBuf.position() == 0)
{
try
{
outBuf.limit(outBuf.position());
outBuf.rewind();

int neededLen = Snappy.uncompressedLength(outBuf);
outBuf.rewind();

if(neededLen > uncompressedBuf.capacity())
uncompressedBuf = ByteBuffer.allocateDirect(neededLen);

int lim = Snappy.uncompress(outBuf, uncompressedBuf);

uncompressedBuf.limit(lim);
uncompressedBuf.rewind();
}
catch (SnappyException e)
{
throw new IOException(e);
}
{
outBuf.limit(outBuf.position());
outBuf.rewind();

int neededLen = Snappy.uncompressedLength(outBuf);
outBuf.rewind();

if(neededLen > uncompressedBuf.capacity())
uncompressedBuf = ByteBuffer.allocateDirect(neededLen);

int lim = Snappy.uncompress(outBuf, uncompressedBuf);

uncompressedBuf.limit(lim);
uncompressedBuf.rewind();
}

int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len : (uncompressedBuf.limit() - uncompressedBuf.position());

if(n == 0)
{
reset();
finished = true;
finished = true;
return 0;
}

uncompressedBuf.get(b, off, n);

bytesWritten += n;
// Set 'finished' if snappy has consumed all user-data

// Set 'finished' if snappy has consumed all user-data
if (uncompressedBuf.position() == uncompressedBuf.limit())
{
reset();
finished = true;
}
}

return n;
return n;
}

public synchronized int getRemaining()
Expand All @@ -153,14 +146,14 @@ public synchronized int getRemaining()
public synchronized void reset()
{
finished = false;

uncompressedBuf.limit(uncompressedBuf.capacity());
uncompressedBuf.rewind();

outBuf.limit(outBuf.capacity());
outBuf.rewind();
bytesRead = bytesWritten = 0L;

bytesRead = bytesWritten = 0L;
}

public synchronized void end()
Expand Down

0 comments on commit fb7f0a2

Please sign in to comment.