Skip to content
Permalink
Browse files
add seqfile (#65)
* add ValueFileOutput/ValueFileInput
* add BitsFileReader/BitFileWriter
* add AbstractBufferedFileInput/AbstractBufferedFileOutput
  • Loading branch information
corgiboygsj committed Jul 5, 2021
1 parent 5492e83 commit 25b9a337850f77d20889ecfcaa0d3f68d9e19407
Showing 16 changed files with 1,577 additions and 198 deletions.
@@ -686,4 +686,12 @@ public static synchronized ComputerOptions instance() {
disallowEmpty(),
"/tmp/hgkv"
);

public static final ConfigOption<Long> VALUE_FILE_MAX_SEGMENT_SIZE =
new ConfigOption<>(
"valuefile.max_segment_size",
"The max number of bytes in each segment of value-file.",
positiveInt(),
Bytes.GB
);
}
@@ -0,0 +1,163 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.io;

import java.io.IOException;

import com.baidu.hugegraph.computer.core.util.BytesUtil;
import com.baidu.hugegraph.util.E;

public abstract class AbstractBufferedFileInput extends UnsafeBytesInput {

private final int bufferCapacity;
private final long fileLength;

protected long fileOffset;

public AbstractBufferedFileInput(int bufferCapacity, long fileLength) {
super(new byte[bufferCapacity], 0, 0);

this.bufferCapacity = bufferCapacity;
this.fileLength = fileLength;
}

@Override
public long position() {
return this.fileOffset - super.remaining();
}

@Override
public void readFully(byte[] b) throws IOException {
this.readFully(b, 0, b.length);
}

@Override
public long skip(long bytesToSkip) throws IOException {
E.checkArgument(bytesToSkip >= 0,
"The parameter bytesToSkip must be >= 0, but got %s",
bytesToSkip);
E.checkArgument(this.available() >= bytesToSkip,
"Failed to skip '%s' bytes, because don't have " +
"enough data");

long positionBeforeSkip = this.position();
this.seek(this.position() + bytesToSkip);
return positionBeforeSkip;
}

@Override
protected void require(int size) throws IOException {
if (this.remaining() >= size) {
return;
}
if (this.bufferCapacity >= size) {
this.shiftAndFillBuffer();
}
/*
* The buffer capacity must be >= 8, read primitive data like int,
* long, float, double can be read from buffer. Only read bytes may
* exceed the limit, and read bytes using readFully is overrode
* in this class. In conclusion, the required data can be
* supplied after shiftAndFillBuffer.
*/
if (size > this.limit()) {
throw new IOException(String.format(
"Reading %s bytes from position %s overflows " +
"buffer length %s",
size, this.position(), this.limit()));
}
}

@Override
public long available() throws IOException {
return this.fileLength - this.position();
}

protected void shiftAndFillBuffer() throws IOException {
this.shiftBuffer();
this.fillBuffer();
}

protected abstract void fillBuffer() throws IOException;

@Override
public int compare(long offset, long length, RandomAccessInput other,
long otherOffset, long otherLength) throws IOException {
byte[] bytes1, bytes2;
int compareOffset1, compareOffset2;

/*
* Set the compare information of the current object.
* Ture: The compare range of the current object is within the buffer.
*/
if (rangeInBuffer(this, offset, length)) {
bytes1 = this.buffer();
compareOffset1 = (int) (offset - bufferStartPosition(this));
} else {
long oldPosition = this.position();
this.seek(offset);
bytes1 = this.readBytes((int) length);
compareOffset1 = 0;
this.seek(oldPosition);
}

/*
* Set the compare information of the compared object.
* Ture: The compare range of the compared object is within the buffer
* and compared object instance of AbstractBufferedFileInput.
*/
AbstractBufferedFileInput otherInput;
if (other instanceof AbstractBufferedFileInput &&
rangeInBuffer(otherInput = (AbstractBufferedFileInput) other,
otherOffset, otherLength)) {
bytes2 = otherInput.buffer();
long otherBufferStart = bufferStartPosition(otherInput);
compareOffset2 = (int) (otherOffset - otherBufferStart);
} else {
long oldPosition = other.position();
other.seek(otherOffset);
bytes2 = other.readBytes((int) otherLength);
compareOffset2 = 0;
other.seek(oldPosition);
}

return BytesUtil.compare(bytes1, compareOffset1, (int) length,
bytes2, compareOffset2, (int) otherLength);
}

private static long bufferStartPosition(AbstractBufferedFileInput input) {
return input.fileOffset - input.limit();
}

private static boolean rangeInBuffer(AbstractBufferedFileInput input,
long offset, long length) {
long bufferStart = bufferStartPosition(input);
return bufferStart <= offset && offset <= input.fileOffset &&
input.limit() >= length;
}

protected int bufferCapacity() {
return this.bufferCapacity;
}

protected long fileLength() {
return this.fileLength;
}
}
@@ -0,0 +1,110 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.io;

import java.io.IOException;

import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.util.E;

public abstract class AbstractBufferedFileOutput extends UnsafeBytesOutput {

private final int bufferCapacity;

protected long fileOffset;

public AbstractBufferedFileOutput(int bufferCapacity) {
super(bufferCapacity);

this.bufferCapacity = bufferCapacity;
this.fileOffset = 0L;
}

@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}

@Override
public void writeFixedInt(long position, int v) throws IOException {
if (this.fileOffset <= position &&
position <= this.position() - Constants.INT_LEN) {
super.writeFixedInt(position - this.fileOffset, v);
return;
}

long latestPosition = this.position();
this.seek(position);
super.writeInt(v);
this.seek(latestPosition);
}

@Override
public long position() {
return this.fileOffset + super.position();
}

@Override
public long skip(long bytesToSkip) throws IOException {
E.checkArgument(bytesToSkip >= 0,
"The parameter bytesToSkip must be >= 0, but got %s",
bytesToSkip);
long positionBeforeSkip = this.position();
this.seek(positionBeforeSkip + bytesToSkip);
return positionBeforeSkip;
}

@Override
protected void require(int size) throws IOException {
E.checkArgument(size <= this.bufferCapacity,
"The parameter size must be <= %s",
this.bufferCapacity);
if (size <= this.bufferAvailable()) {
return;
}
this.flushBuffer();
/*
* The buffer capacity must be >= 8, write primitive data like int,
* long, float, double can be write to buffer after flush buffer.
* Only write bytes may exceed the limit, and write bytes using
* write(byte[] b) is overrode in this class. In conclusion, the
* required size can be supplied after flushBuffer.
*/
if (size > this.bufferAvailable()) {
throw new IOException(String.format(
"Write %s bytes to position %s overflows buffer %s",
size, this.position(), this.bufferCapacity));
}
}

protected int bufferSize() {
return (int) super.position();
}

protected int bufferAvailable() {
return this.bufferCapacity - (int) super.position();
}

protected int bufferCapacity() {
return this.bufferCapacity;
}

protected abstract void flushBuffer() throws IOException;
}

0 comments on commit 25b9a33

Please sign in to comment.