Skip to content
Permalink
Browse files
add BufferedFileDataOutput and BufferedFileDataInput (#15)
* add BufferedFileDataOutput and BufferedFileDataInput

* add BufferedInputStream and BufferedOutputStream
  • Loading branch information
houzhizhen committed Mar 11, 2021
1 parent bb462d3 commit 128ce85d0b02dec779792449ce923eb8a7e19df5
Showing 22 changed files with 2,208 additions and 129 deletions.
@@ -19,6 +19,8 @@

package com.baidu.hugegraph.computer.core.common;

import com.baidu.hugegraph.util.Bytes;

public final class Constants {

public static final byte[] EMPTY_BYTES = new byte[0];
@@ -36,4 +38,17 @@ public final class Constants {
public static final int UINT8_MAX = 0xff;
public static final int UINT16_MAX = 0xffff;
public static final long UINT32_MAX = 0xffffffffL;

/*
* The default buffer size for buffered input & output in package
* com.baidu.hugegraph.computer.core.io
*/
public static final int DEFAULT_BUFFER_SIZE = (int) Bytes.KB * 8;

// The mode to read a file
public static final String FILE_MODE_READ = "r";

// The mode to write a file
public static final String FILE_MODE_WRITE = "rw";

}
@@ -0,0 +1,152 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.io;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.util.E;

public class BufferedFileInput extends UnsafeByteArrayInput {

private final int bufferCapacity;
private final RandomAccessFile file;
private long fileOffset;

public BufferedFileInput(File file) throws IOException {
this(new RandomAccessFile(file, Constants.FILE_MODE_READ),
Constants.DEFAULT_BUFFER_SIZE);
}

public BufferedFileInput(RandomAccessFile file, int bufferCapacity)
throws IOException {
super(new byte[bufferCapacity], 0);
E.checkArgument(bufferCapacity >= 8,
"The parameter bufferSize must be >= 8");
this.file = file;
this.bufferCapacity = bufferCapacity;
this.fillBuffer();
}

@Override
public long position() {
return this.fileOffset - super.remaining();
}

@Override
public void readFully(byte[] b) throws IOException {
this.readFully(b, 0, b.length);
}

@Override
public void readFully(byte[] b, int off, int len) throws IOException {
int remaining = super.remaining();
if (len <= remaining) {
super.readFully(b, off, len);
} else if (len <= this.bufferCapacity) {
this.shiftAndFillBuffer();
super.readFully(b, off, len);
} else {
super.readFully(b, off, remaining);
this.file.readFully(b, off + remaining, len - remaining);
this.fileOffset += len;
}
}

@Override
public void seek(long position) throws IOException {
long bufferStart = this.fileOffset - this.limit();
if (position >= bufferStart && position < this.fileOffset) {
super.seek(position - bufferStart);
return;
}
if (position >= this.file.length()) {
throw new EOFException(String.format(
"Can't seek to %s, reach the end of file",
position));
} else {
this.file.seek(position);
super.seek(0L);
this.limit(0);
this.fileOffset = position;
this.fillBuffer();
}
}

@Override
public long skip(long bytesToSkip) throws IOException {
E.checkArgument(bytesToSkip >= 0,
"The parameter bytesToSkip must be >=0, but got %s",
bytesToSkip);
long positionBeforeSkip = this.position();
if (this.remaining() >= bytesToSkip) {
super.skip(bytesToSkip);
return positionBeforeSkip;
}
bytesToSkip -= this.remaining();
long position = this.fileOffset + bytesToSkip;
this.seek(position);
return positionBeforeSkip;
}

@Override
public void close() throws IOException {
this.file.close();
}

@Override
protected void require(int size) throws IOException {
if (this.remaining() >= size) {
return;
}
if (this.bufferCapacity >= size) {
this.shiftAndFillBuffer();
}
/*
* The buffer capacity must be >= 8, read primitive data like int,
* long, float, double can be read from buffer. Only read bytes may
* exceed the limit, and read bytes using readFully is overrode
* in this class. In conclusion, the required data can be
* supplied after shiftAndFillBuffer.
*/
if (size > this.limit()) {
throw new IOException(String.format(
"Read %s bytes from position %s overflows buffer %s",
size, this.position(), this.limit()));
}
}

private void shiftAndFillBuffer() throws IOException {
this.shiftBuffer();
this.fillBuffer();
}

private void fillBuffer() throws IOException {
long fileLength = this.file.length();
int readLen = Math.min(this.bufferCapacity - this.limit(),
(int) (fileLength - this.fileOffset));
this.fileOffset += readLen;
this.file.readFully(this.buffer(), this.limit(), readLen);
this.limit(this.limit() + readLen);
}
}
@@ -0,0 +1,165 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.computer.core.io;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.util.E;

/**
* This class acted as new DataOutputStream(new BufferedOutputStream(new File
* (file))). It has two functions. The first is buffer the content until the
* buffer is full. The second is unsafe data output.
* This class is not thread safe.
*/
public class BufferedFileOutput extends UnsafeByteArrayOutput {

private final int bufferCapacity;
private final RandomAccessFile file;
private long fileOffset;

public BufferedFileOutput(File file) throws FileNotFoundException {
this(new RandomAccessFile(file, Constants.FILE_MODE_WRITE),
Constants.DEFAULT_BUFFER_SIZE);
}

public BufferedFileOutput(RandomAccessFile file, int bufferCapacity) {
super(bufferCapacity);
E.checkArgument(bufferCapacity >= 8,
"The parameter bufferSize must be >= 8");
this.bufferCapacity = bufferCapacity;
this.file = file;
this.fileOffset = 0L;
}

@Override
public void write(byte[] b) throws IOException {
this.write(b, 0, b.length);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (len <= this.bufferAvailable()) {
super.write(b, off, len);
return;
}
this.flushBuffer();
if (len <= this.bufferCapacity) {
super.write(b, off, len);
} else {
// The len > buffer size, write out directly
this.file.write(b, off, len);
this.fileOffset += len;
}
}

@Override
public void writeInt(long position, int v) throws IOException {
long latestPosition = this.position();
this.seek(position);
super.writeInt(v);
this.seek(latestPosition);
}

@Override
public long position() {
return this.fileOffset + super.position();
}

@Override
public void seek(long position) throws IOException {
if (this.fileOffset <= position &&
position <= this.fileOffset + this.bufferCapacity) {
super.seek(position - this.fileOffset);
return;
}
this.flushBuffer();
this.file.seek(position);
this.fileOffset = position;
}

@Override
public long skip(long bytesToSkip) throws IOException {
E.checkArgument(bytesToSkip >= 0,
"The parameter bytesToSkip must be >= 0, but got %s",
bytesToSkip);
long positionBeforeSkip = this.fileOffset + super.position();
if (bytesToSkip <= this.bufferAvailable()) {
super.skip(bytesToSkip);
return positionBeforeSkip;
}

this.flushBuffer();
if (bytesToSkip <= this.bufferCapacity) {
super.skip(bytesToSkip);
} else {
this.fileOffset += bytesToSkip;
this.file.seek(this.fileOffset);
}
return positionBeforeSkip;
}

@Override
protected void require(int size) throws IOException {
E.checkArgument(size <= this.bufferCapacity,
"The parameter size must be <= %s",
this.bufferCapacity);
if (size <= this.bufferAvailable()) {
return;
}
this.flushBuffer();
/*
* The buffer capacity must be >= 8, write primitive data like int,
* long, float, double can be write to buffer after flush buffer.
* Only write bytes may exceed the limit, and write bytes using
* write(byte[] b) is overrode in this class. In conclusion, the
* required size can be supplied after flushBuffer.
*/
if (size > this.bufferAvailable()) {
throw new IOException(String.format(
"Write %s bytes to position %s overflows buffer %s",
size, this.position(), this.bufferCapacity));
}
}

private void flushBuffer() throws IOException {
int bufferPosition = (int) super.position();
if (bufferPosition == 0) {
return;
}
this.file.write(this.buffer(), 0, bufferPosition);
this.fileOffset += bufferPosition;
super.seek(0);
}

@Override
public void close() throws IOException {
this.flushBuffer();
this.file.close();
}

private final int bufferAvailable() {
return this.bufferCapacity - (int) super.position();
}
}

0 comments on commit 128ce85

Please sign in to comment.