Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-481. Encapsulate the RaftLog reading in its own class #12

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -71,10 +71,10 @@ public interface LogReader extends AutoCloseable {
* Each provided buffer must be capable of holding one complete record from the Log. If the provided buffer is
* too small, an exception will be thrown.
*
* @param buffers A non-empty list of non-null ByteBuffers.
* @param buffers A non-empty array of non-null ByteBuffers.
* @return The number of records returns, equivalent to the number of filled buffers.
*/
int readBulk(List<ByteBuffer> buffers) throws IOException;
int readBulk(ByteBuffer[] buffers) throws IOException;

/**
* Returns the current position of this Reader. The position is a {@code recordId}.
Expand Down
Expand Up @@ -40,9 +40,9 @@ public interface LogWriter extends AutoCloseable {
* to have been written.
*
* @param records Records to append
* @return The largest recordId assigned to the records written
* @return The recordIds assigned to the records written
*/
long write(List<ByteBuffer> records) throws IOException;
List<Long> write(List<ByteBuffer> records) throws IOException;

/**
* Guarantees that all previous data appended by {@link #write(ByteBuffer)} are persisted
Expand Down
Expand Up @@ -149,13 +149,13 @@ public List<ByteBuffer> readBulk(int numRecords) throws IOException {
}

@Override
public int readBulk(List<ByteBuffer> buffers) throws IOException {
public int readBulk(ByteBuffer[] buffers) throws IOException {
Preconditions.checkNotNull(buffers, "list of buffers is NULL" );
Preconditions.checkArgument(buffers.size() > 0, "list of buffers is empty");
Preconditions.checkArgument(buffers.length > 0, "list of buffers is empty");

try {
RaftClientReply reply = raftClient.sendReadOnly(Message.valueOf(LogServiceProtoUtil
.toReadLogRequestProto(parent.getName(), currentRecordId, buffers.size()).toByteString()));
.toReadLogRequestProto(parent.getName(), currentRecordId, buffers.length).toByteString()));
ReadLogReplyProto proto = ReadLogReplyProto.parseFrom(reply.getMessage().getContent());
if (proto.hasException()) {
LogServiceException e = proto.getException();
Expand All @@ -165,7 +165,7 @@ public int readBulk(List<ByteBuffer> buffers) throws IOException {
int n = proto.getLogRecordCount();
currentRecordId += n;
for (int i = 0; i < n; i++) {
buffers.get(i).put(proto.getLogRecord(i).toByteArray());
buffers[i] = ByteBuffer.wrap(proto.getLogRecord(i).toByteArray());
}
return n;
} catch (Exception e) {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -59,13 +60,12 @@ public LogWriterImpl(LogStream logStream) {

@Override
public long write(ByteBuffer data) throws IOException {
List<ByteBuffer> list = new ArrayList<ByteBuffer>();
list.add(data);
return write(list);
// One record to write is always one recordId returned
return write(Collections.singletonList(data)).get(0);
}

@Override
public long write(List<ByteBuffer> list) throws IOException {
public List<Long> write(List<ByteBuffer> list) throws IOException {
try {
RaftClientReply reply = raftClient.send(
Message.valueOf(LogServiceProtoUtil.toAppendBBEntryLogRequestProto(parent.getName(), list).toByteString()));
Expand All @@ -76,7 +76,7 @@ public long write(List<ByteBuffer> list) throws IOException {
}
List<Long> ids = proto.getRecordIdList();
// The above call Always returns one id (regardless of a batch size)
return ids.get(0);
return ids;
} catch (Exception e) {
throw new IOException(e);
}
Expand Down
@@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.logservice.server;

import static java.util.Objects.requireNonNull;

import java.util.NoSuchElementException;

import org.apache.ratis.logservice.proto.LogServiceProtos.AppendLogEntryRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceRequestProto.RequestCase;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftLogIOException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A reader for the {@link RaftLog} which is accessed using LogService recordId's instead
* of Raft log indexes. Not thread-safe.
*/
public class LogServiceRaftLogReader {
private static final Logger LOG = LoggerFactory.getLogger(LogServiceRaftLogReader.class);
private final RaftLog raftLog;

private long currentRecordId = -1;
private long currentRaftIndex = -1;
private AppendLogEntryRequestProto currentLogEntry = null;
private int currentLogEntryOffset = -1;
private ByteString currentRecord = null;

public LogServiceRaftLogReader(RaftLog raftLog) {
this.raftLog = requireNonNull(raftLog);
}

/**
* Positions this reader just before the current recordId. Use {@link #next()} to get that
* element, but take care to check if a value is present using {@link #hasNext()} first.
*/
public void seek(long recordId) throws RaftLogIOException, InvalidProtocolBufferException {
LOG.debug("Seeking to recordId={}", recordId);
// RaftLog starting index
currentRaftIndex = raftLog.getStartIndex();
currentRecordId = 0;

currentLogEntry = null;
currentLogEntryOffset = -1;
currentRecord = null;

loadNext();
while (currentRecordId < recordId && hasNext()) {
next();
currentRecordId++;
}
}

/**
* Returns true if there is a log entry to read.
*/
public boolean hasNext() throws RaftLogIOException, InvalidProtocolBufferException {
return currentRecord != null;
}

/**
* Returns the next log entry. Ensure {@link #hasNext()} returns true before
* calling this method.
*/
public ByteString next() throws RaftLogIOException, InvalidProtocolBufferException {
if (currentRecord == null) {
throw new NoSuchElementException();
}
ByteString current = currentRecord;
currentRecord = null;
loadNext();
return current;
}

/**
* Finds the next record from the RaftLog and sets it as {@link #currentRecord}.
*/
private void loadNext() throws RaftLogIOException, InvalidProtocolBufferException {
// Clear the old "current" record
currentRecord = null;
LOG.debug("Loading next value: raftIndex={}, recordId={}, proto='{}', offset={}",
currentRaftIndex, currentRecordId,
currentLogEntry == null ? "null" : TextFormat.shortDebugString(currentLogEntry),
currentLogEntryOffset);
// Continue iterating over the current entry.
if (currentLogEntry != null) {
assert currentLogEntryOffset != -1;
currentLogEntryOffset++;

// We have an element to read from our current entry
if (currentLogEntryOffset < currentLogEntry.getDataCount()) {
currentRecord = currentLogEntry.getData(currentLogEntryOffset);
return;
}
// We don't have an element in our current entry so null it out.
currentLogEntry = null;
currentLogEntryOffset = -1;
// Also, increment to the next element in the RaftLog
currentRaftIndex++;
}

// Make sure we don't read off the end of the Raft log
for (; currentRaftIndex < raftLog.getLastCommittedIndex(); currentRaftIndex++) {
try {
LogEntryProto entry = raftLog.get(currentRaftIndex);
if (LOG.isTraceEnabled()) {
LOG.trace("Raft Index: {} Entry: {}", currentRaftIndex,
TextFormat.shortDebugString(entry));
}
if (entry == null || entry.hasConfigurationEntry()) {
continue;
}

LogServiceRequestProto logServiceProto =
LogServiceRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
// TODO is it possible to get LogService messages that aren't appends?
if (RequestCase.APPENDREQUEST != logServiceProto.getRequestCase()) {
continue;
}

currentLogEntry = logServiceProto.getAppendRequest();
currentLogEntryOffset = 0;
if (currentLogEntry.getDataCount() > 0) {
currentRecord = currentLogEntry.getData(currentLogEntryOffset);
return;
}
currentLogEntry = null;
currentLogEntryOffset = -1;
} catch (RaftLogIOException e) {
LOG.error("Caught exception reading from RaftLog", e);
throw e;
} catch (InvalidProtocolBufferException e) {
LOG.error("Caught exception reading LogService protobuf from RaftLog", e);
throw e;
}
}
// If we make it here, we've read off the end of the RaftLog.
}
}