Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ private void recoverWALHeader() throws IOException {
for (int i = 0; i < WAL_HEADER_COUNT; i++) {
try {
final ByteBuffer byteBuffer = ByteBuffer.allocate(WAL_HEADER_SIZE);
walChannel.read(byteBuffer, i * WAL_HEADER_CAPACITY);
int read = walChannel.read(byteBuffer, i * WAL_HEADER_CAPACITY);
if (read != WAL_HEADER_SIZE) {
continue;
}
WALHeaderCoreData walHeaderCoreData = WALHeaderCoreData.unmarshal(byteBuffer.position(0).limit(WAL_HEADER_SIZE));
if (walHeaderCoreDataAvailable == null || walHeaderCoreDataAvailable.lastWriteTimestamp3 < walHeaderCoreData.lastWriteTimestamp3) {
walHeaderCoreDataAvailable = walHeaderCoreData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void write(ByteBuffer src, long position) throws IOException {
int writen = 0;
do {
ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining);
// FIXME: make sure the position is aligned
int write = randomAccessFile.write(slice, position + writen);
if (write == -1) {
throw new IOException("write -1");
Expand All @@ -127,20 +128,28 @@ public void write(ByteBuffer src, long position) throws IOException {

@Override
public int read(ByteBuffer dst, long position) throws IOException {
// FIXME: a small dst will lead to a zero size read
int bufferDirectIOAlignedSize = (int) WALUtil.alignSmallByBlockSize(dst.capacity());

makeThreadLocalBytebufferMatchDirectIO(bufferDirectIOAlignedSize);

ByteBuffer tlDirectBuffer = threadLocalByteBuffer.get();
tlDirectBuffer.position(0).limit(bufferDirectIOAlignedSize);
int count = randomAccessFile.read(tlDirectBuffer, position);
if (count == -1) {
throw new IOException("read -1");

int bytesRead = 0;
while (tlDirectBuffer.hasRemaining()) {
// FIXME: make sure the position is aligned
int read = randomAccessFile.read(tlDirectBuffer, position + bytesRead);
if (read == -1) {
break;
}
bytesRead += read;
}

tlDirectBuffer.position(0).limit(count);
tlDirectBuffer.position(0).limit(bytesRead);
// FIXME: newPosition is wrong
dst.position(0).put(tlDirectBuffer);
dst.position(0).limit(count);
return count;
dst.position(0).limit(bytesRead);
return bytesRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface WALChannel {
*/
void write(ByteBuffer src, long position) throws IOException;

/**
* Read bytes into the given buffer from the given position until the buffer is full or the end of the file.
*/
int read(ByteBuffer dst, long position) throws IOException;

class WALChannelBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ public void write(ByteBuffer src, long position) throws IOException {

@Override
public int read(ByteBuffer dst, long position) throws IOException {
return fileChannel.read(dst, position);
int bytesRead = 0;
while (dst.hasRemaining()) {
int read = fileChannel.read(dst, position + bytesRead);
if (read == -1) {
break;
}
bytesRead += read;
}
return bytesRead;
}
}