diff --git a/src/java/src/com/hadoop/compression/snappy/SnappyCompressor.java b/src/java/src/com/hadoop/compression/snappy/SnappyCompressor.java index 8923d64..a193ca0 100644 --- a/src/java/src/com/hadoop/compression/snappy/SnappyCompressor.java +++ b/src/java/src/com/hadoop/compression/snappy/SnappyCompressor.java @@ -15,21 +15,21 @@ public class SnappyCompressor implements Compressor { private static final Log logger = LogFactory.getLog(SnappyCompressor.class.getName()); - + private boolean finish, finished; private ByteBuffer outBuf; private ByteBuffer compressedBuf; - + private long bytesRead = 0L; private long bytesWritten = 0L; - + public SnappyCompressor(int bufferSize) { - outBuf = ByteBuffer.allocateDirect(bufferSize); - compressedBuf = ByteBuffer.allocateDirect(Snappy.maxCompressedLength(bufferSize)); - - reset(); + outBuf = ByteBuffer.allocateDirect(bufferSize); + compressedBuf = ByteBuffer.allocateDirect(Snappy.maxCompressedLength(bufferSize)); + + reset(); } public synchronized void setInput(byte[] b, int off, int len) @@ -43,11 +43,11 @@ public synchronized void setInput(byte[] b, int off, int len) throw new ArrayIndexOutOfBoundsException(); } finished = false; - + outBuf.put(b, off, len); - + bytesRead += len; - } + } public synchronized void setDictionary(byte[] b, int off, int len) { @@ -55,12 +55,12 @@ public synchronized void setDictionary(byte[] b, int off, int len) } public synchronized boolean needsInput() - { + { // needs input if compressed data was consumed - if (compressedBuf.position() > 0 && compressedBuf.limit() > compressedBuf.position()) + if (compressedBuf.position() > 0 && compressedBuf.limit() > compressedBuf.position()) return false; - - return true; + + return true; } public synchronized void finish() @@ -70,13 +70,13 @@ public synchronized void finish() public synchronized boolean finished() { - // Check if all compressed data has been consumed + // Check if all compressed data has been consumed return (finish && finished); } public synchronized int compress(byte[] b, int off, int len) throws IOException { - + if (b == null) { throw new NullPointerException(); @@ -85,58 +85,50 @@ public synchronized int compress(byte[] b, int off, int len) throws IOException { throw new ArrayIndexOutOfBoundsException(); } - + if(finished || outBuf.position() == 0) { finished = true; return 0; } - - + + //Only need todo this once if(compressedBuf.position() == 0) { - try - { - outBuf.limit(outBuf.position()); - outBuf.rewind(); - - int lim = Snappy.compress(outBuf, compressedBuf); - - compressedBuf.limit(lim); - compressedBuf.rewind(); - } - catch (SnappyException e) - { - throw new IOException(e); - } + outBuf.limit(outBuf.position()); + outBuf.rewind(); + + int lim = Snappy.compress(outBuf, compressedBuf); + + compressedBuf.limit(lim); + compressedBuf.rewind(); } - - - int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len : (compressedBuf.limit() - compressedBuf.position()); - + + + int n = (compressedBuf.limit() - compressedBuf.position()) > len ? len : (compressedBuf.limit() - compressedBuf.position()); + if(n == 0) { finished = true; return 0; } - + compressedBuf.get(b, off, n); - + bytesWritten += n; - - // Set 'finished' if snappy has consumed all user-data + + // Set 'finished' if snappy has consumed all user-data if (compressedBuf.position() == compressedBuf.limit()) { finished = true; - + outBuf.limit(outBuf.capacity()); outBuf.rewind(); - + compressedBuf.limit(compressedBuf.capacity()); compressedBuf.rewind(); - - } + } return n; } @@ -145,13 +137,13 @@ public synchronized void reset() { finish = false; finished = false; - + outBuf.limit(outBuf.capacity()); outBuf.rewind(); - + compressedBuf.limit(compressedBuf.capacity()); compressedBuf.rewind(); - + bytesRead = bytesWritten = 0L; } diff --git a/src/java/src/com/hadoop/compression/snappy/SnappyDecompressor.java b/src/java/src/com/hadoop/compression/snappy/SnappyDecompressor.java index f2babb3..247ec0a 100644 --- a/src/java/src/com/hadoop/compression/snappy/SnappyDecompressor.java +++ b/src/java/src/com/hadoop/compression/snappy/SnappyDecompressor.java @@ -14,24 +14,24 @@ public class SnappyDecompressor implements Decompressor { private static final Log logger = LogFactory.getLog(SnappyDecompressor.class.getName()); - + private boolean finished; private ByteBuffer outBuf; private ByteBuffer uncompressedBuf; - + private long bytesRead = 0L; private long bytesWritten = 0L; - + public SnappyDecompressor(int bufferSize) { outBuf = ByteBuffer.allocateDirect(bufferSize); uncompressedBuf = ByteBuffer.allocateDirect(bufferSize); - + reset(); } - + public synchronized void setInput(byte[] b, int off, int len) { if (b == null) @@ -42,11 +42,11 @@ public synchronized void setInput(byte[] b, int off, int len) { throw new ArrayIndexOutOfBoundsException(); } - - finished = false; + + finished = false; outBuf.put(b, off, len); - + bytesRead += len; } @@ -56,11 +56,11 @@ public synchronized void setDictionary(byte[] b, int off, int len) } public synchronized boolean needsInput() - { + { //needs input if the uncompressed data was consumed if (uncompressedBuf.position() > 0 && uncompressedBuf.limit() > uncompressedBuf.position()) return false; - + return true; } @@ -70,14 +70,14 @@ public synchronized boolean needsDictionary() } public synchronized boolean finished() - { + { return finished; } public synchronized int decompress(byte[] b, int off, int len) throws IOException { - + if (b == null) { throw new NullPointerException(); @@ -86,62 +86,55 @@ public synchronized int decompress(byte[] b, int off, int len) throws IOExceptio { throw new ArrayIndexOutOfBoundsException(); } - + //nothing to decompress if ((outBuf.position() == 0 && uncompressedBuf.position() == 0) || finished) { reset(); finished = true; - + return 0; } - + //only needs to do this once per input if(uncompressedBuf.position() == 0) - { - try - { - outBuf.limit(outBuf.position()); - outBuf.rewind(); - - int neededLen = Snappy.uncompressedLength(outBuf); - outBuf.rewind(); - - if(neededLen > uncompressedBuf.capacity()) - uncompressedBuf = ByteBuffer.allocateDirect(neededLen); - - int lim = Snappy.uncompress(outBuf, uncompressedBuf); - - uncompressedBuf.limit(lim); - uncompressedBuf.rewind(); - } - catch (SnappyException e) - { - throw new IOException(e); - } + { + outBuf.limit(outBuf.position()); + outBuf.rewind(); + + int neededLen = Snappy.uncompressedLength(outBuf); + outBuf.rewind(); + + if(neededLen > uncompressedBuf.capacity()) + uncompressedBuf = ByteBuffer.allocateDirect(neededLen); + + int lim = Snappy.uncompress(outBuf, uncompressedBuf); + + uncompressedBuf.limit(lim); + uncompressedBuf.rewind(); } - + int n = (uncompressedBuf.limit() - uncompressedBuf.position()) > len ? len : (uncompressedBuf.limit() - uncompressedBuf.position()); - + if(n == 0) { reset(); - finished = true; + finished = true; return 0; } - + uncompressedBuf.get(b, off, n); - + bytesWritten += n; - - // Set 'finished' if snappy has consumed all user-data + + // Set 'finished' if snappy has consumed all user-data if (uncompressedBuf.position() == uncompressedBuf.limit()) { reset(); finished = true; - } + } - return n; + return n; } public synchronized int getRemaining() @@ -153,14 +146,14 @@ public synchronized int getRemaining() public synchronized void reset() { finished = false; - + uncompressedBuf.limit(uncompressedBuf.capacity()); uncompressedBuf.rewind(); - + outBuf.limit(outBuf.capacity()); outBuf.rewind(); - - bytesRead = bytesWritten = 0L; + + bytesRead = bytesWritten = 0L; } public synchronized void end() diff --git a/src/java/src/org/apache/cassandra/hadoop/fs/CassandraFileSystemThriftStore.java b/src/java/src/org/apache/cassandra/hadoop/fs/CassandraFileSystemThriftStore.java index 3edf055..ee52169 100644 --- a/src/java/src/org/apache/cassandra/hadoop/fs/CassandraFileSystemThriftStore.java +++ b/src/java/src/org/apache/cassandra/hadoop/fs/CassandraFileSystemThriftStore.java @@ -46,18 +46,18 @@ import org.xerial.snappy.SnappyException; /** - * + * * CFs schema: - * + * * Column Families: * - inode * - sblocks - * + * * ------------------- * | inode | * ------------------- * {key : [: < > ], [: < >], [ : < all blocks with its subBlocks serialized>] } - * + * * ------------------ * | sblocks | * ------------------ @@ -68,11 +68,11 @@ public class CassandraFileSystemThriftStore implements CassandraFileSystemStore private final static Logger logger = Logger.getLogger(CassandraFileSystemThriftStore.class); private static final String keySpace = "cfs"; - + // Cfs for normal use. They can be overridden if the archive mode is set. private static String inodeDefaultCf = "inode"; private static String sblockDefaultCf = "sblocks"; - + // Cfs for archive kind of storage private static final String inodeArchiveCf = "inode_archive"; private static final String sblockArchiveCf = "sblocks_archive"; @@ -97,10 +97,10 @@ public class CassandraFileSystemThriftStore implements CassandraFileSystemStore // This values can be overridden if the archive mode is set. private ColumnPath inodeDataPath = null; private ColumnPath sblockDataPath = null; - + private ByteBuffer compressedData = null; private ByteBuffer uncompressedData = null; - + private StorageType storageTypeInUse = StorageType.CFS_REGULAR; private static final SlicePredicate pathPredicate = new SlicePredicate().setColumn_names(Arrays.asList(pathCol)); @@ -158,30 +158,30 @@ public void initialize(URI uri, Configuration conf) throws IOException * Set to different set of Column Families is the archive location is selected. */ private void initCFNames(URI uri) { - + if (isArchive(uri)) { // cfs-archive:/// inodeCfInUse = inodeArchiveCf; sblockCfInUse = sblockArchiveCf; - + storageTypeInUse = StorageType.CFS_ARCHIVE; } else { // cfs:/// inodeCfInUse = inodeDefaultCf; sblockCfInUse = sblockDefaultCf; } - + // Create the remaining paths and parents base on the CfInUse. - + sblockPath = new ColumnPath(sblockCfInUse); sblockParent = new ColumnParent(sblockCfInUse); - + inodePath = new ColumnPath(inodeCfInUse); inodeParent = new ColumnParent(inodeCfInUse); - + inodeDataPath = new ColumnPath(inodeCfInUse).setColumn(dataCol); sblockDataPath = new ColumnPath(sblockCfInUse).setColumn(dataCol); - + } /** @@ -193,7 +193,7 @@ private boolean isArchive(URI uri) { /** * Initialize the consistency levels for reads and writes. - * + * * @param ks * Keyspace definition */ @@ -261,15 +261,15 @@ public KsDef createKeySpace() throws IOException cf.setComment("Stores file meta data"); cf.setKeyspace(keySpace); - // this is a workaround until + // this is a workaround until // http://issues.apache.org/jira/browse/CASSANDRA-1278 cf.setMemtable_flush_after_mins(1); cf.setMemtable_throughput_in_mb(128); - + cf.setColumn_metadata( Arrays.asList(new ColumnDef(pathCol, "BytesType"). setIndex_type(IndexType.KEYS). - setIndex_name("path"), + setIndex_name("path"), new ColumnDef(sentCol, "BytesType"). setIndex_type(IndexType.KEYS). setIndex_name("sentinel"), @@ -294,9 +294,9 @@ public KsDef createKeySpace() throws IOException cf.setMin_compaction_threshold(16); cf.setMax_compaction_threshold(64); - + cfs.add(cf); - + // CFs for archive cf = new CfDef(); cf.setName(inodeArchiveCf); @@ -307,15 +307,15 @@ public KsDef createKeySpace() throws IOException cf.setComment("Stores file meta data"); cf.setKeyspace(keySpace); - // this is a workaround until + // this is a workaround until // http://issues.apache.org/jira/browse/CASSANDRA-1278 cf.setMemtable_flush_after_mins(1); cf.setMemtable_throughput_in_mb(128); - + cf.setColumn_metadata( Arrays.asList(new ColumnDef(pathCol, "BytesType"). setIndex_type(IndexType.KEYS). - setIndex_name("path"), + setIndex_name("path"), new ColumnDef(sentCol, "BytesType"). setIndex_type(IndexType.KEYS). setIndex_name("sentinel"), @@ -340,9 +340,9 @@ public KsDef createKeySpace() throws IOException // Disable compaction for archive. cf.setMin_compaction_threshold(0); cf.setMax_compaction_threshold(0); - + cfs.add(cf); - + Map stratOpts = new HashMap(); stratOpts.put(BriskSimpleSnitch.BRISK_DC, System.getProperty("cfs.replication","1")); stratOpts.put(BriskSimpleSnitch.CASSANDRA_DC, "0"); @@ -370,7 +370,7 @@ public InputStream retrieveBlock(Block block, long byteRangeStart) throws IOExce { return new CassandraSubBlockInputStream(this, block, byteRangeStart); } - + public InputStream retrieveSubBlock(Block block, SubBlock subBlock, long byteRangeStart) throws IOException { ByteBuffer blockId = uuidToByteBuffer(block.id); @@ -380,7 +380,7 @@ public InputStream retrieveSubBlock(Block block, SubBlock subBlock, long byteRan try { - blockData = ((Brisk.Iface) client).get_cfs_sblock(FBUtilities.getLocalAddress().getHostName(), + blockData = ((Brisk.Iface) client).get_cfs_sblock(FBUtilities.getLocalAddress().getHostName(), blockId, subBlockId, (int) 0, storageTypeInUse); } catch (Exception e) @@ -396,59 +396,52 @@ public InputStream retrieveSubBlock(Block block, SubBlock subBlock, long byteRan is = getInputStream(blockData.remote_block); else is = readLocalBlock(blockData.getLocal_block()); - + if(byteRangeStart > 0) is.skip(byteRangeStart); - + return is; } private synchronized InputStream getInputStream(ByteBuffer bb) throws IOException - { - + { + ByteBuffer output = null; - - try - { - if(compressedData == null || compressedData.capacity() < bb.remaining()) - compressedData = ByteBuffer.allocateDirect(bb.remaining()); - - compressedData.limit(compressedData.capacity()); - compressedData.rewind(); - compressedData.put(bb.duplicate()); - compressedData.limit(compressedData.position()); - compressedData.rewind(); - - if(Snappy.isValidCompressedBuffer(compressedData)) - { - - int uncompressedLength = Snappy.uncompressedLength(compressedData); - - if(uncompressedData == null || uncompressedData.capacity() < uncompressedLength) - { - uncompressedData = ByteBuffer.allocateDirect(uncompressedLength); - } - - int len = Snappy.uncompress(compressedData, uncompressedData); - - uncompressedData.limit(len); - uncompressedData.rewind(); - - output = uncompressedData; - } - else + + if(compressedData == null || compressedData.capacity() < bb.remaining()) + compressedData = ByteBuffer.allocateDirect(bb.remaining()); + + compressedData.limit(compressedData.capacity()); + compressedData.rewind(); + compressedData.put(bb.duplicate()); + compressedData.limit(compressedData.position()); + compressedData.rewind(); + + if(Snappy.isValidCompressedBuffer(compressedData)) + { + + int uncompressedLength = Snappy.uncompressedLength(compressedData); + + if(uncompressedData == null || uncompressedData.capacity() < uncompressedLength) { - output = compressedData; + uncompressedData = ByteBuffer.allocateDirect(uncompressedLength); } + + int len = Snappy.uncompress(compressedData, uncompressedData); + + uncompressedData.limit(len); + uncompressedData.rewind(); + + output = uncompressedData; } - catch (SnappyException e) + else { - throw new IOException(e); + output = compressedData; } - + return ByteBufferUtil.inputStream(output); } - + private InputStream readLocalBlock(LocalBlock blockInfo) throws IOException { @@ -468,9 +461,9 @@ private InputStream readLocalBlock(LocalBlock blockInfo) throws IOException MappedByteBuffer bb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, blockInfo.offset, blockInfo.length); - + return getInputStream(bb); - + } catch (FileNotFoundException e) { @@ -494,11 +487,11 @@ public INode retrieveINode(Path path) throws IOException ColumnOrSuperColumn pathInfo; pathInfo = performGet(pathKey, inodeDataPath, consistencyLevelRead); - + // If not found and I already tried with CL= ONE, retry with higher CL. if (pathInfo == null && consistencyLevelRead.equals(ConsistencyLevel.ONE)) { - pathInfo = performGet(pathKey, inodeDataPath, ConsistencyLevel.QUORUM); + pathInfo = performGet(pathKey, inodeDataPath, ConsistencyLevel.QUORUM); } if (pathInfo == null) @@ -506,13 +499,13 @@ public INode retrieveINode(Path path) throws IOException // Now give up and return null. return null; } - + return INode.deserialize(ByteBufferUtil.inputStream(pathInfo.column.value), pathInfo.column.getTimestamp()); } private ColumnOrSuperColumn performGet(ByteBuffer key, ColumnPath cp, ConsistencyLevel cl) throws IOException { ColumnOrSuperColumn result; - try + try { result = client.get(key, cp, cl); } @@ -524,7 +517,7 @@ private ColumnOrSuperColumn performGet(ByteBuffer key, ColumnPath cp, Consistenc { throw new IOException(e); } - + return result; } @@ -534,7 +527,7 @@ private ColumnOrSuperColumn performGet(ByteBuffer key, ColumnPath cp, Consistenc public synchronized void storeSubBlock(UUID parentBlockUUID, SubBlock sblock, ByteBuffer data) throws IOException { assert parentBlockUUID != null; - + // Row key is the Block id to which this SubBLock belongs to. ByteBuffer parentBlockId = uuidToByteBuffer(parentBlockUUID); @@ -544,26 +537,19 @@ public synchronized void storeSubBlock(UUID parentBlockUUID, SubBlock sblock, By { compressedData = ByteBuffer.allocateDirect(maxCapacity); } - + compressedData.limit(compressedData.capacity()); compressedData.rewind(); - + //compress - try - { - int len = Snappy.compress(data, compressedData); - compressedData.limit(len); - compressedData.rewind(); - } - catch (SnappyException e1) - { - throw new IOException(e1); - } - + int len = Snappy.compress(data, compressedData); + compressedData.limit(len); + compressedData.rewind(); + if (logger.isDebugEnabled()) { logger.debug("Storing " + sblock); } - + // Row Key: UUID of SubBLock Block parent // Column name: Sub Block UUID // Column value: Sub Block Data. @@ -571,9 +557,9 @@ public synchronized void storeSubBlock(UUID parentBlockUUID, SubBlock sblock, By try { client.insert( - parentBlockId, - sblockParent, - new Column().setName(uuidToByteBuffer(sblock.id)).setValue(compressedData).setTimestamp(System.currentTimeMillis()), + parentBlockId, + sblockParent, + new Column().setName(uuidToByteBuffer(sblock.id)).setValue(compressedData).setTimestamp(System.currentTimeMillis()), consistencyLevelWrite); } catch (Exception e) @@ -584,7 +570,7 @@ public synchronized void storeSubBlock(UUID parentBlockUUID, SubBlock sblock, By public void storeINode(Path path, INode inode) throws IOException { - + if (logger.isDebugEnabled() && inode.getBlocks() != null) { logger.debug("Writing inode to: " + path); printBlocksDebug(inode.getBlocks()); @@ -629,16 +615,16 @@ public void storeINode(Path path, INode inode) throws IOException /** * @param path a Path - * @return the parent to the path or null if the path represents the root. + * @return the parent to the path or null if the path represents the root. */ private String getParentForIndex(Path path) { Path parent = path.getParent(); - + if (parent == null) { return "null"; } - + return parent.toUri().getPath(); } @@ -700,7 +686,7 @@ public void deleteSubBlocks(INode inode) throws IOException throw new IOException(e); } } - + /** * Retrieves a list of UUIDs * @param blocks list of blocks