From a03b0149f2291fcf455e894cec97a389af0c356f Mon Sep 17 00:00:00 2001 From: Ahad Rana Date: Tue, 7 Jun 2011 05:05:48 -0700 Subject: [PATCH] added BinaryComparableWithOffet to deal with comparables that need offset information, added HBase BoundedRangeFileInputStream to utils, modified FlexBuffer to derive from BinaryComparableWithOffset modified SimHash code to produce simhash from byte stream instead of char stream extended TFileReader to have a ValueReader object, to allow for partial deserialization of thrift objects modifed TFileThriftObjectWriter to take replication factor as a parameter in constructor added TFileUtils to allow for introspection of TFile metadata modified TextBytes to derive from BinaryComparableWithOffset modified URLUtils to strip www prefix by default during canonicalization --- .../commoncrawl/protocol/shared/protocol.jr | 35 +++ .../shared/BinaryComparableWithOffset.java | 45 ++++ .../shared/BoundedRangeFileInputStream.java | 193 ++++++++++++++ .../commoncrawl/util/shared/DateUtils.java | 1 + .../commoncrawl/util/shared/FlexBuffer.java | 24 +- src/org/commoncrawl/util/shared/SimHash.java | 125 ++++++--- .../commoncrawl/util/shared/TFileReader.java | 97 +++++++ .../util/shared/TFileThriftObjectWriter.java | 4 +- .../commoncrawl/util/shared/TFileUtils.java | 246 ++++++++++++++++++ .../commoncrawl/util/shared/TextBytes.java | 2 +- src/org/commoncrawl/util/shared/URLUtils.java | 4 +- 11 files changed, 733 insertions(+), 43 deletions(-) create mode 100644 src/org/commoncrawl/util/shared/BinaryComparableWithOffset.java create mode 100644 src/org/commoncrawl/util/shared/BoundedRangeFileInputStream.java create mode 100644 src/org/commoncrawl/util/shared/TFileReader.java create mode 100644 src/org/commoncrawl/util/shared/TFileUtils.java diff --git a/src/org/commoncrawl/protocol/shared/protocol.jr b/src/org/commoncrawl/protocol/shared/protocol.jr index b596932..23b327b 100644 --- a/src/org/commoncrawl/protocol/shared/protocol.jr +++ b/src/org/commoncrawl/protocol/shared/protocol.jr @@ -132,4 +132,39 @@ module org.commoncrawl.protocol.shared { [key] vlong urlHash = 2; vlong rootDomainHash = 3; } + + // cc-cache data structure + class CacheItem { + + // doucment url + ustring url = 1; + // url fingerprint + long urlFingerprint = 2; + + enum Source { + WebRequest = 1; + S3Cache = 2; + } + // document source + byte source = 3; + + // flags + enum Flags { + Flag_IsTemporaryRedirect = 1; + Flag_IsPermanentRedirect = 2; + Flag_IsCompressed = 4; + Flag_WasTruncatedDuringDownload = 8; + Flag_WasTruncatedDuringInflate = 16; + } + int flags = 4; + + // if this was a redirect, the final url + ustring finalURL = 5; + + // parsed header items ... + vector headerItems =6; + // content (if available) + buffer content =7; + } + } diff --git a/src/org/commoncrawl/util/shared/BinaryComparableWithOffset.java b/src/org/commoncrawl/util/shared/BinaryComparableWithOffset.java new file mode 100644 index 0000000..7cf698a --- /dev/null +++ b/src/org/commoncrawl/util/shared/BinaryComparableWithOffset.java @@ -0,0 +1,45 @@ +package org.commoncrawl.util.shared; + +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.WritableComparator; + +/** + * adds offset support to BinaryComparable + * + * @author rana + * + */ +public abstract class BinaryComparableWithOffset extends BinaryComparable { + + /** + * get the offset into the underlying byte array + * @return + */ + public abstract int getOffset(); + + /** + * Compare bytes from {#getBytes()}. + * @see org.apache.hadoop.io.WritableComparator#compareBytes(byte[],int,int,byte[],int,int) + */ + public int compareTo(BinaryComparable other) { + if (this == other) + return 0; + if (other instanceof BinaryComparableWithOffset) { + return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(), + other.getBytes(), ((BinaryComparableWithOffset)other).getOffset(), other.getLength()); + } + else { + return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(), + other.getBytes(), 0, other.getLength()); + } + } + + /** + * Compare bytes from {#getBytes()} to those provided. + */ + public int compareTo(byte[] other, int off, int len) { + return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(), + other, off, len); + } + +} diff --git a/src/org/commoncrawl/util/shared/BoundedRangeFileInputStream.java b/src/org/commoncrawl/util/shared/BoundedRangeFileInputStream.java new file mode 100644 index 0000000..4f7e16e --- /dev/null +++ b/src/org/commoncrawl/util/shared/BoundedRangeFileInputStream.java @@ -0,0 +1,193 @@ +package org.commoncrawl.util.shared; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; + + +/** + * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop + * FSDataInputStream as a regular input stream. One can create multiple + * BoundedRangeFileInputStream on top of the same FSDataInputStream and they + * would not interfere with each other. + * Copied from hadoop-335 tfile. + */ +public class BoundedRangeFileInputStream extends InputStream implements Seekable, PositionedReadable { + + static final Log LOG = LogFactory.getLog(BoundedRangeFileInputStream.class); + + private FSDataInputStream in; + private long pos; + private long end; + private long mark; + private final byte[] oneByte = new byte[1]; + private final boolean pread; + + /** + * Constructor + * + * @param in + * The FSDataInputStream we connect to. + * @param offset + * Beginning offset of the region. + * @param length + * Length of the region. + * @param pread If true, use Filesystem positional read rather than seek+read. + * + * The actual length of the region may be smaller if (off_begin + + * length) goes beyond the end of FS input stream. + */ + public BoundedRangeFileInputStream(FSDataInputStream in, long offset, + long length) { + if (offset < 0 || length < 0) { + throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + + "/" + length); + } + + this.in = in; + this.pos = offset; + this.end = offset + length; + this.mark = -1; + this.pread = true; + } + + @Override + public int available() throws IOException { + int avail = in.available(); + if (pos + avail > end) { + avail = (int) (end - pos); + } + + return avail; + } + + @Override + public int read() throws IOException { + int ret = read(oneByte); + if (ret == 1) return oneByte[0] & 0xff; + return -1; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } + + int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos))); + if (n == 0) return -1; + int ret = 0; + if (this.pread) { + LOG.info("PREAD Reading at Pos:" + pos + " Bytes:" + n); + ret = in.read(pos, b, off, n); + } else { + synchronized (in) { + LOG.info("NONPREAD Reading at Pos:" + pos + " Bytes:" + n); + in.seek(pos); + ret = in.read(b, off, n); + } + } + if (ret < 0) { + end = pos; + return -1; + } + pos += ret; + return ret; + } + + @Override + /* + * We may skip beyond the end of the file. + */ + public long skip(long n) throws IOException { + long len = Math.min(n, end - pos); + pos += len; + return len; + } + + @Override + public void mark(int readlimit) { + mark = pos; + } + + @Override + public void reset() throws IOException { + if (mark < 0) throw new IOException("Resetting to invalid mark"); + pos = mark; + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void close() { + // Invalidate the state of the stream. + in = null; + pos = end; + mark = -1; + } + + @Override + public long getPos() throws IOException { + return pos; + } + + @Override + public void seek(long pos) throws IOException { + this.pos = pos; + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return in.seekToNewSource(targetPos); + } + + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + LOG.info("Reading at Pos:" + position + " Bytes:" + length); + return in.read(position, buffer, offset, length); + } + + @Override + public void readFully(long position, byte[] buffer) throws IOException { + LOG.info("Reading at Pos:" + position + " Bytes:" + buffer.length); + in.readFully(position, buffer); + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + LOG.info("Reading at Pos:" + position + " Bytes:" + length); + in.readFully(position, buffer,offset,length); + } +} \ No newline at end of file diff --git a/src/org/commoncrawl/util/shared/DateUtils.java b/src/org/commoncrawl/util/shared/DateUtils.java index f795e9c..1d30467 100644 --- a/src/org/commoncrawl/util/shared/DateUtils.java +++ b/src/org/commoncrawl/util/shared/DateUtils.java @@ -216,6 +216,7 @@ public static long parseHttpDate(String time_string) { public static void main(String[] args) { Assert.assertFalse(parseHttpDate("Sun, 22 Nov 2009 01:37:06GMT") == -1); Assert.assertFalse(parseHttpDate("Sun, 22 Nov 2009 01:37:06 GMT") == -1); + Assert.assertFalse(parseHttpDate("Thu, 26 May 2011 03:40:51 GMT") == -1); } } diff --git a/src/org/commoncrawl/util/shared/FlexBuffer.java b/src/org/commoncrawl/util/shared/FlexBuffer.java index fe2e235..2170ae8 100644 --- a/src/org/commoncrawl/util/shared/FlexBuffer.java +++ b/src/org/commoncrawl/util/shared/FlexBuffer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; +import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparable; @@ -37,7 +38,7 @@ * capacity. * */ -public final class FlexBuffer implements WritableComparable,RawComparator,RawComparable, Cloneable { +public final class FlexBuffer extends BinaryComparableWithOffset implements WritableComparable,RawComparator,RawComparable, Cloneable { /** Number of valid bytes in this.bytes. */ int count; /** Backing store for Buffer. */ @@ -190,10 +191,6 @@ public void setCount(int count) { this.count = count; } - /** Get current offset **/ - public int getOffset() { - return offset; - } /** * Get the capacity, which is the maximum count that could handled without @@ -411,4 +408,21 @@ public int offset() { public int size() { return count; } + + @Override + public byte[] getBytes() { + return zbytes; + } + + @Override + public int getLength() { + return count; + } + + /** Get current offset **/ + @Override + public int getOffset() { + return offset; + } + } diff --git a/src/org/commoncrawl/util/shared/SimHash.java b/src/org/commoncrawl/util/shared/SimHash.java index 9c4562d..0e1654b 100644 --- a/src/org/commoncrawl/util/shared/SimHash.java +++ b/src/org/commoncrawl/util/shared/SimHash.java @@ -9,6 +9,8 @@ import java.nio.CharBuffer; import java.util.Set; +import org.mortbay.log.Log; + /** * a basic SimHash implementation * @@ -28,6 +30,10 @@ public class SimHash { * @return 64 bit simhash of input string */ + + // byte gram + private static final int FIXED_BGRAM_LENGTH = 8; + // character gram private static final int FIXED_CGRAM_LENGTH = 4; public static long computeOptimizedSimHashForString(String s) { @@ -39,8 +45,6 @@ public static long computeOptimizedSimHashForString(CharBuffer s) { LongSet shingles = new LongOpenHashSet(Math.min(s.length(), 100000)); int length = s.length(); - - long timeStart = System.currentTimeMillis(); for (int i = 0; i < length - FIXED_CGRAM_LENGTH + 1; i++) { // extract an ngram @@ -54,8 +58,62 @@ public static long computeOptimizedSimHashForString(CharBuffer s) { shingles.add(shingle); } - long timeEnd = System.currentTimeMillis(); - // System.out.println("NGram Production Took:" + (timeEnd-timeStart)); + + int v[] = new int[HASH_SIZE]; + byte longAsBytes[] = new byte[8]; + + for (long shingle : shingles) { + + longAsBytes[0] = (byte) (shingle >> 56); + longAsBytes[1] = (byte) (shingle >> 48); + longAsBytes[2] = (byte) (shingle >> 40); + longAsBytes[3] = (byte) (shingle >> 32); + longAsBytes[4] = (byte) (shingle >> 24); + longAsBytes[5] = (byte) (shingle >> 16); + longAsBytes[6] = (byte) (shingle >> 8); + longAsBytes[7] = (byte) (shingle); + + long longHash = FPGenerator.std64.fp(longAsBytes, 0, 8); + for (int i = 0; i < HASH_SIZE; ++i) { + boolean bitSet = ((longHash >> i) & 1L) == 1L; + v[i] += (bitSet) ? 1 : -1; + } + } + + long simhash = 0; + for (int i = 0; i < HASH_SIZE; ++i) { + if (v[i] > 0) { + simhash |= (1L << i); + } + } + return simhash; + } + + public static long computeOptimizedSimHashForBytes(byte[] data,int offset,int length) { + + LongSet shingles = new LongOpenHashSet(Math.min(length/FIXED_BGRAM_LENGTH, 100000)); + + for (int i = offset; i < length - FIXED_BGRAM_LENGTH + 1; i++) { + int pos = i; + // extract an ngram + long shingle = data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos++]; + shingle <<= 8; + shingle |= data[pos]; + + shingles.add(shingle); + } int v[] = new int[HASH_SIZE]; byte longAsBytes[] = new byte[8]; @@ -139,35 +197,36 @@ public static void main(String[] args) { stream2.read(data2); String string1 = new String(data1); String string2 = new String(data2); - - long timeStart = System.currentTimeMillis(); - long simhash1 = computeSimHashFromString(Shingle.shingles(string1)); - long timeEnd = System.currentTimeMillis(); - System.out.println("Old Calc for Document A Took:" - + (timeEnd - timeStart)); - timeStart = System.currentTimeMillis(); - long simhash2 = computeSimHashFromString(Shingle.shingles(string2)); - timeEnd = System.currentTimeMillis(); - System.out.println("Old Calc for Document B Took:" - + (timeEnd - timeStart)); - timeStart = System.currentTimeMillis(); - long simhash3 = computeOptimizedSimHashForString(string1); - timeEnd = System.currentTimeMillis(); - System.out.println("New Calc for Document A Took:" - + (timeEnd - timeStart)); - timeStart = System.currentTimeMillis(); - long simhash4 = computeOptimizedSimHashForString(string2); - timeEnd = System.currentTimeMillis(); - System.out.println("New Calc for Document B Took:" - + (timeEnd - timeStart)); - - int hammingDistance = hammingDistance(simhash1, simhash2); - int hammingDistance2 = hammingDistance(simhash3, simhash4); - - System.out.println("hammingdistance Doc (A) to Doc(B) OldWay:" - + hammingDistance); - System.out.println("hammingdistance Doc (A) to Doc(B) NewWay:" - + hammingDistance2); + for (int i=0;i<100;++i) { + long timeStart = System.currentTimeMillis(); + long simhash1 = computeSimHashFromString(Shingle.shingles(string1)); + long timeEnd = System.currentTimeMillis(); + System.out.println("Old Calc for Document A Took:" + + (timeEnd - timeStart)); + timeStart = System.currentTimeMillis(); + long simhash2 = computeSimHashFromString(Shingle.shingles(string2)); + timeEnd = System.currentTimeMillis(); + System.out.println("Old Calc for Document B Took:" + + (timeEnd - timeStart)); + timeStart = System.currentTimeMillis(); + long simhash3 = computeOptimizedSimHashForBytes(data1,0,data1.length); + timeEnd = System.currentTimeMillis(); + System.out.println("New Calc for Document A Took:" + + (timeEnd - timeStart)); + timeStart = System.currentTimeMillis(); + long simhash4 = computeOptimizedSimHashForBytes(data2,0,data2.length); + timeEnd = System.currentTimeMillis(); + System.out.println("New Calc for Document B Took:" + + (timeEnd - timeStart)); + + int hammingDistance = hammingDistance(simhash1, simhash2); + int hammingDistance2 = hammingDistance(simhash3, simhash4); + + System.out.println("hammingdistance Doc (A) to Doc(B) OldWay:" + + hammingDistance); + System.out.println("hammingdistance Doc (A) to Doc(B) NewWay:" + + hammingDistance2); + } } catch (IOException e) { e.printStackTrace(); } diff --git a/src/org/commoncrawl/util/shared/TFileReader.java b/src/org/commoncrawl/util/shared/TFileReader.java new file mode 100644 index 0000000..30f215f --- /dev/null +++ b/src/org/commoncrawl/util/shared/TFileReader.java @@ -0,0 +1,97 @@ +package org.commoncrawl.util.shared; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.file.tfile.TFile; + +/** + * encapsulate all the elements necessary to read a TFile + * + * @author rana + * + */ +public class TFileReader { + + static final Log LOG = LogFactory.getLog(TFileReader.class); + + private FSDataInputStream _inputStream; + private TFile.Reader _reader; + private TFile.Reader.Scanner _scanner; + + public TFileReader(FileSystem fs,Configuration conf,Path path) throws IOException { + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus == null) { + throw new IOException("Invalid File:" + path); + } + + _inputStream = fs.open(path); + _reader = new TFile.Reader(_inputStream,fileStatus.getLen(),conf); + _scanner = _reader.createScanner(); + } + + public TFile.Reader getReader() { + return _reader; + } + + public TFile.Reader.Scanner getScanner() { + return _scanner; + } + + public boolean next(KeyType keyType,ValueType valueType) throws IOException { + if (_scanner != null && !_scanner.atEnd()) { + keyType.readFields(_scanner.entry().getKeyStream()); + valueType.readFields(_scanner.entry().getValueStream()); + + _scanner.advance(); + + return true; + } + return false; + } + + public boolean hasMoreData() { + return (_scanner != null && !_scanner.atEnd()); + } + + public void close() { + if (_scanner != null) + try { + _scanner.close(); + } catch (IOException e) { + LOG.error(CCStringUtils.stringifyException(e)); + } + finally { + _scanner = null; + } + + if (_reader != null) + try { + _reader.close(); + } catch (IOException e) { + LOG.error(CCStringUtils.stringifyException(e)); + } + finally { + _reader = null; + } + + if (_inputStream != null) + try { + _inputStream.close(); + } catch (IOException e) { + LOG.error(CCStringUtils.stringifyException(e)); + } + finally { + _inputStream = null; + } + + } + +} diff --git a/src/org/commoncrawl/util/shared/TFileThriftObjectWriter.java b/src/org/commoncrawl/util/shared/TFileThriftObjectWriter.java index 8026550..e48c8ec 100644 --- a/src/org/commoncrawl/util/shared/TFileThriftObjectWriter.java +++ b/src/org/commoncrawl/util/shared/TFileThriftObjectWriter.java @@ -37,8 +37,8 @@ public class TFileThriftObjectWriter> comparatorClass) throws IOException { - _stream = fs.create(path); + public TFileThriftObjectWriter(FileSystem fs,Configuration conf,Path path,short replicationFactor,String compressionScheme,int minCompressedBlockSize,Class< ? extends RawComparator> comparatorClass) throws IOException { + _stream = fs.create(path,replicationFactor); _writer = new TFile.Writer(_stream,minCompressedBlockSize,compressionScheme,TFile.COMPARATOR_JCLASS + comparatorClass.getName(), conf); } diff --git a/src/org/commoncrawl/util/shared/TFileUtils.java b/src/org/commoncrawl/util/shared/TFileUtils.java new file mode 100644 index 0000000..d50039f --- /dev/null +++ b/src/org/commoncrawl/util/shared/TFileUtils.java @@ -0,0 +1,246 @@ +package org.commoncrawl.util.shared; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.file.tfile.Utils; +import org.apache.hadoop.io.file.tfile.Utils.Version; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + + +/** TFile helpers + * + * @author rana + * + */ +public class TFileUtils { + + + static final Log LOG = LogFactory.getLog(TFileUtils.class); + + /** + * HACKS necessary to get at BCFile Metadata Information :-( + */ + static final Version BCFILE_API_VERSION = new Version((short) 1, (short) 0); + final static String TFILE_META_BLOCK_NAME = "TFile.meta"; + final static String TFILE_BLOCK_NAME = "TFile.index"; + final static String BCFILE_META_BLOCK_NAME = "BCFile.index"; + + + + /** + * Magic number uniquely identifying a BCFile in the header/footer. + */ + static final class BCFileMagic { + private final static byte[] AB_MAGIC_BCFILE = + { + // ... total of 16 bytes + (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91, + (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf, + (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1, + (byte) 0x50 }; + + public static void readAndVerify(DataInput in) throws IOException { + byte[] abMagic = new byte[size()]; + in.readFully(abMagic); + + // check against AB_MAGIC_BCFILE, if not matching, throw an + // Exception + if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) { + throw new IOException("Not a valid BCFile."); + } + } + + public static void write(DataOutput out) throws IOException { + out.write(AB_MAGIC_BCFILE); + } + + public static int size() { + return AB_MAGIC_BCFILE.length; + } + } + public static class BCFileMetaReader { + public Version version = null; + public BCFileMetaIndex metaIndex = null; + public long fileLength; + + public BCFileMetaReader(FSDataInputStream fin, long fileLength, + Configuration conf) throws IOException { + + this.fileLength = fileLength; + // move the cursor to the beginning of the tail, containing: offset to the + // meta block index, version and magic + fin.seek(fileLength - BCFileMagic.size() - Version.size() - Long.SIZE + / Byte.SIZE); + long offsetIndexMeta = fin.readLong(); + version = new Version(fin); + BCFileMagic.readAndVerify(fin); + + if (!version.compatibleWith(BCFILE_API_VERSION)) { + throw new RuntimeException("Incompatible BCFile fileBCFileVersion."); + } + + // read meta index + fin.seek(offsetIndexMeta); + metaIndex = new BCFileMetaIndex(fin); + } + + + public long getEarliestMetadataOffset() { + long earliestIndexPos = fileLength; + for (Map.Entry entry : metaIndex.index.entrySet()) { + earliestIndexPos = Math.min(entry.getValue().getRegion().getOffset(),earliestIndexPos); + } + return earliestIndexPos; + } + } + + /** + * Index for all Meta blocks. + */ + static class BCFileMetaIndex { + // use a tree map, for getting a meta block entry by name + final Map index; + + // for write + public BCFileMetaIndex() { + index = new TreeMap(); + } + + // for read, construct the map from the file + public BCFileMetaIndex(DataInput in) throws IOException { + int count = Utils.readVInt(in); + index = new TreeMap(); + + for (int nx = 0; nx < count; nx++) { + BCFileMetaIndexEntry indexEntry = new BCFileMetaIndexEntry(in); + index.put(indexEntry.getMetaName(), indexEntry); + } + } + } + + /** + * An entry describes a meta block in the MetaIndex. + */ + static final class BCFileMetaIndexEntry { + + private final String metaName; + private final String compressionAlgorithm; + private final static String defaultPrefix = "data:"; + + private final BCFileBlockRegion region; + + public BCFileMetaIndexEntry(DataInput in) throws IOException { + String fullMetaName = Utils.readString(in); + if (fullMetaName.startsWith(defaultPrefix)) { + metaName = + fullMetaName.substring(defaultPrefix.length(), fullMetaName + .length()); + } else { + throw new IOException("Corrupted Meta region Index"); + } + + compressionAlgorithm = Utils.readString(in); + region = new BCFileBlockRegion(in); + } + + public String getMetaName() { + return metaName; + } + + public String getCompressionAlgorithm() { + return compressionAlgorithm; + } + + public BCFileBlockRegion getRegion() { + return region; + } + + } + + /** + * Block region. + */ + static final class BCFileBlockRegion { + private final long offset; + private final long compressedSize; + private final long rawSize; + + public BCFileBlockRegion(DataInput in) throws IOException { + offset = Utils.readVLong(in); + compressedSize = Utils.readVLong(in); + rawSize = Utils.readVLong(in); + } + + public long getOffset() { + return offset; + } + + public long getCompressedSize() { + return compressedSize; + } + + public long getRawSize() { + return rawSize; + } + } + + + public static void main(String[] args) { + // setup the basics ... + logger = Logger.getLogger("org.commoncrawl"); + logger.setLevel(Level.INFO); + BasicConfigurator.configure(); + + Configuration conf = new Configuration(); + + conf.addResource("core-site.xml"); + conf.addResource("hdfs-site.xml"); + + try { + // get the file system object ... + FileSystem fs = FileSystem.get(conf); + + // load the specified TFIle + LOG.info("Loading TFile at:" + args[0]); + + Path path = new Path(args[0]); + + FileStatus status = fs.getFileStatus(path); + + FSDataInputStream in = fs.open(path); + try { + LOG.info("Initializing Metadata Reader"); + BCFileMetaReader metaReader = new BCFileMetaReader(in,status.getLen(),conf); + LOG.info("Earliest Index Offset:" + metaReader.getEarliestMetadataOffset()); + + BCFileMetaIndex index = metaReader.metaIndex; + for (Map.Entry entry : index.index.entrySet()) { + LOG.info("Entry:" + entry.getKey() + + " Pos:" + entry.getValue().getRegion().getOffset() + + " Len:" + entry.getValue().getRegion().getRawSize()); + } + } + finally { + in.close(); + } + + } + catch (IOException e) { + LOG.error(CCStringUtils.stringifyException(e)); + } + } +} diff --git a/src/org/commoncrawl/util/shared/TextBytes.java b/src/org/commoncrawl/util/shared/TextBytes.java index 0e3cfe7..d96086a 100644 --- a/src/org/commoncrawl/util/shared/TextBytes.java +++ b/src/org/commoncrawl/util/shared/TextBytes.java @@ -47,7 +47,7 @@ * @author rana * */ -public final class TextBytes extends BinaryComparable implements +public final class TextBytes extends BinaryComparableWithOffset implements WritableComparable,Cloneable { private static final Log LOG = LogFactory diff --git a/src/org/commoncrawl/util/shared/URLUtils.java b/src/org/commoncrawl/util/shared/URLUtils.java index 3d1cf53..ab76ee3 100644 --- a/src/org/commoncrawl/util/shared/URLUtils.java +++ b/src/org/commoncrawl/util/shared/URLUtils.java @@ -268,7 +268,7 @@ public static URLFPV2 getURLFPV2FromURL(String urlString) { try { // canonicalize the incoming url ... - String canonicalURL = URLUtils.canonicalizeURL(urlString, false); + String canonicalURL = URLUtils.canonicalizeURL(urlString, true); if (canonicalURL != null) { return getURLFPV2FromCanonicalURL(canonicalURL); @@ -288,7 +288,7 @@ public static URLFPV2 getURLFPV2FromURL(String urlString) { public static URLFPV2 getURLFPV2FromURLObject(GoogleURL urlObject) { try { // canonicalize the incoming url ... - String canonicalURL = URLUtils.canonicalizeURL(urlObject, false); + String canonicalURL = URLUtils.canonicalizeURL(urlObject, true); if (canonicalURL != null) { return getURLFPV2FromCanonicalURL(canonicalURL);