Skip to content

Commit

Permalink
added BinaryComparableWithOffet to deal with comparables that need of…
Browse files Browse the repository at this point in the history
…fset information, added HBase BoundedRangeFileInputStream to utils, modified FlexBuffer to derive from BinaryComparableWithOffset modified SimHash code to produce simhash from byte stream instead of char stream extended TFileReader to have a ValueReader object, to allow for partial deserialization of thrift objects modifed TFileThriftObjectWriter to take replication factor as a parameter in constructor added TFileUtils to allow for introspection of TFile metadata modified TextBytes to derive from BinaryComparableWithOffset modified URLUtils to strip www prefix by default during canonicalization
  • Loading branch information
Ahad Rana authored and Ahad Rana committed Nov 14, 2011
1 parent 10d96b6 commit a03b014
Show file tree
Hide file tree
Showing 11 changed files with 733 additions and 43 deletions.
35 changes: 35 additions & 0 deletions src/org/commoncrawl/protocol/shared/protocol.jr
Expand Up @@ -132,4 +132,39 @@ module org.commoncrawl.protocol.shared {
[key] vlong urlHash = 2;
vlong rootDomainHash = 3;
}

// cc-cache data structure
class CacheItem {

// doucment url
ustring url = 1;
// url fingerprint
long urlFingerprint = 2;

enum Source {
WebRequest = 1;
S3Cache = 2;
}
// document source
byte source = 3;

// flags
enum Flags {
Flag_IsTemporaryRedirect = 1;
Flag_IsPermanentRedirect = 2;
Flag_IsCompressed = 4;
Flag_WasTruncatedDuringDownload = 8;
Flag_WasTruncatedDuringInflate = 16;
}
int flags = 4;

// if this was a redirect, the final url
ustring finalURL = 5;

// parsed header items ...
vector<org.commoncrawl.protocol.shared.ArcFileHeaderItem> headerItems =6;
// content (if available)
buffer content =7;
}

}
45 changes: 45 additions & 0 deletions src/org/commoncrawl/util/shared/BinaryComparableWithOffset.java
@@ -0,0 +1,45 @@
package org.commoncrawl.util.shared;

import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* adds offset support to BinaryComparable
*
* @author rana
*
*/
public abstract class BinaryComparableWithOffset extends BinaryComparable {

/**
* get the offset into the underlying byte array
* @return
*/
public abstract int getOffset();

/**
* Compare bytes from {#getBytes()}.
* @see org.apache.hadoop.io.WritableComparator#compareBytes(byte[],int,int,byte[],int,int)
*/
public int compareTo(BinaryComparable other) {
if (this == other)
return 0;
if (other instanceof BinaryComparableWithOffset) {
return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(),
other.getBytes(), ((BinaryComparableWithOffset)other).getOffset(), other.getLength());
}
else {
return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(),
other.getBytes(), 0, other.getLength());
}
}

/**
* Compare bytes from {#getBytes()} to those provided.
*/
public int compareTo(byte[] other, int off, int len) {
return WritableComparator.compareBytes(getBytes(), getOffset(), getLength(),
other, off, len);
}

}
193 changes: 193 additions & 0 deletions src/org/commoncrawl/util/shared/BoundedRangeFileInputStream.java
@@ -0,0 +1,193 @@
package org.commoncrawl.util.shared;

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;


/**
* BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop
* FSDataInputStream as a regular input stream. One can create multiple
* BoundedRangeFileInputStream on top of the same FSDataInputStream and they
* would not interfere with each other.
* Copied from hadoop-335 tfile.
*/
public class BoundedRangeFileInputStream extends InputStream implements Seekable, PositionedReadable {

static final Log LOG = LogFactory.getLog(BoundedRangeFileInputStream.class);

private FSDataInputStream in;
private long pos;
private long end;
private long mark;
private final byte[] oneByte = new byte[1];
private final boolean pread;

/**
* Constructor
*
* @param in
* The FSDataInputStream we connect to.
* @param offset
* Beginning offset of the region.
* @param length
* Length of the region.
* @param pread If true, use Filesystem positional read rather than seek+read.
*
* The actual length of the region may be smaller if (off_begin +
* length) goes beyond the end of FS input stream.
*/
public BoundedRangeFileInputStream(FSDataInputStream in, long offset,
long length) {
if (offset < 0 || length < 0) {
throw new IndexOutOfBoundsException("Invalid offset/length: " + offset
+ "/" + length);
}

this.in = in;
this.pos = offset;
this.end = offset + length;
this.mark = -1;
this.pread = true;
}

@Override
public int available() throws IOException {
int avail = in.available();
if (pos + avail > end) {
avail = (int) (end - pos);
}

return avail;
}

@Override
public int read() throws IOException {
int ret = read(oneByte);
if (ret == 1) return oneByte[0] & 0xff;
return -1;
}

@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
}

int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
if (n == 0) return -1;
int ret = 0;
if (this.pread) {
LOG.info("PREAD Reading at Pos:" + pos + " Bytes:" + n);
ret = in.read(pos, b, off, n);
} else {
synchronized (in) {
LOG.info("NONPREAD Reading at Pos:" + pos + " Bytes:" + n);
in.seek(pos);
ret = in.read(b, off, n);
}
}
if (ret < 0) {
end = pos;
return -1;
}
pos += ret;
return ret;
}

@Override
/*
* We may skip beyond the end of the file.
*/
public long skip(long n) throws IOException {
long len = Math.min(n, end - pos);
pos += len;
return len;
}

@Override
public void mark(int readlimit) {
mark = pos;
}

@Override
public void reset() throws IOException {
if (mark < 0) throw new IOException("Resetting to invalid mark");
pos = mark;
}

@Override
public boolean markSupported() {
return true;
}

@Override
public void close() {
// Invalidate the state of the stream.
in = null;
pos = end;
mark = -1;
}

@Override
public long getPos() throws IOException {
return pos;
}

@Override
public void seek(long pos) throws IOException {
this.pos = pos;
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return in.seekToNewSource(targetPos);
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
LOG.info("Reading at Pos:" + position + " Bytes:" + length);
return in.read(position, buffer, offset, length);
}

@Override
public void readFully(long position, byte[] buffer) throws IOException {
LOG.info("Reading at Pos:" + position + " Bytes:" + buffer.length);
in.readFully(position, buffer);
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
LOG.info("Reading at Pos:" + position + " Bytes:" + length);
in.readFully(position, buffer,offset,length);
}
}
1 change: 1 addition & 0 deletions src/org/commoncrawl/util/shared/DateUtils.java
Expand Up @@ -216,6 +216,7 @@ public static long parseHttpDate(String time_string) {
public static void main(String[] args) {
Assert.assertFalse(parseHttpDate("Sun, 22 Nov 2009 01:37:06GMT") == -1);
Assert.assertFalse(parseHttpDate("Sun, 22 Nov 2009 01:37:06 GMT") == -1);
Assert.assertFalse(parseHttpDate("Thu, 26 May 2011 03:40:51 GMT") == -1);

}
}
24 changes: 19 additions & 5 deletions src/org/commoncrawl/util/shared/FlexBuffer.java
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
Expand All @@ -37,7 +38,7 @@
* capacity.
*
*/
public final class FlexBuffer implements WritableComparable<FlexBuffer>,RawComparator<FlexBuffer>,RawComparable, Cloneable {
public final class FlexBuffer extends BinaryComparableWithOffset implements WritableComparable<BinaryComparable>,RawComparator<FlexBuffer>,RawComparable, Cloneable {
/** Number of valid bytes in this.bytes. */
int count;
/** Backing store for Buffer. */
Expand Down Expand Up @@ -190,10 +191,6 @@ public void setCount(int count) {
this.count = count;
}

/** Get current offset **/
public int getOffset() {
return offset;
}

/**
* Get the capacity, which is the maximum count that could handled without
Expand Down Expand Up @@ -411,4 +408,21 @@ public int offset() {
public int size() {
return count;
}

@Override
public byte[] getBytes() {
return zbytes;
}

@Override
public int getLength() {
return count;
}

/** Get current offset **/
@Override
public int getOffset() {
return offset;
}

}

0 comments on commit a03b014

Please sign in to comment.