Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-523 RATIS-524 RATIS-525 RATIS-526 RATIS-527 RATIS-533 Lots of cleanup on the LogService #18

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions ratis-logservice/README.md
Expand Up @@ -73,13 +73,22 @@ $ mvn exec:java -Dexec.mainClass=org.apache.ratis.logservice.shell.LogServiceShe
Similarly, a full quorum can be started by building a docker container and then start the docker-compose cluster:
```bash
$ cd ratis-logservice && mvn package assembly:single -DskipTests
$ docker build -t ratis-logservice --build-arg BINARY=target/ratis-logservice-0.4.0-SNAPSHOT-bin.tar.gz --build-arg VERSION=0.4.0-SNAPSHOT .
$ ./build-docker.sh
$ docker-compose up
```

Then, a client container can be launched to connect to the cluster:
```bash
$ docker run --rm --network ratis-logservice_default -it ratis-logservice java -cp "/opt/ratis-logservice/current/conf:/opt/ratis-logservice/current/lib/*" org.apache.ratis.logservice.shell.LogServiceShell -q master1.logservice.ratis.org:9999,master2.logservice.ratis.org:9999,master3.logservice.ratis.org:9999
$ ./client-env.sh
$ ./bin/shell -q master1.logservice.ratis.org:9999,master2.logservice.ratis.org:9999,master3.logservice.ratis.org:9999
```

Take care that the correct network is provided to the LogServiceShell command.
Or, you can launch the verification tool to generate load on the cluster:
```bash
$ ./client-env.sh
$ ./bin/load-test -q master1.logservice.ratis.org:9999,master2.logservice.ratis.org:9999,master3.logservice.ratis.org:9999
```

`client-env.sh` launches a Docker container that can communicate with the LogService cluster running from
`docker-compose`. You can do this by hand, but take care that the correct network is provided when launching your
container.
42 changes: 42 additions & 0 deletions ratis-logservice/TUNING.md
@@ -0,0 +1,42 @@
# Tuning for the Log Service

This is a list of Ratis configuration properties which have been
found to be relevant/important to control how Ratis operates for
the purposes of the LogService.

## RAFT Log

The default RAFT log implementation uses "segments" on disk to avoid
a single file growing to be very large. By default, each segment is
`8MB` in size and can be set by the API `RaftServerConfigKeys.Log.setSegmentSizeMax()`.
When a new segment is created, Ratis will "preallocate" that segment by writing
data into the file to reduce the risk of latency when we first try to append
entries to the RAFT log. By default, the segment is preallocated with `4MB`
and can be changed via `RaftServerConfigKeys.Log.setPreallocatedSize()`.

Up to 2 log segments are cached in memory (including the segment actively being
written to). This is controlled by `RaftServerConfigKeys.Log.setMaxCachedSegmentNum()`.
Increasing this configuration would use more memory but should reduce the latency
of reading entries from the RAFT log.

Writes to the RAFT log are buffered using a Java Direct ByteBuffer (offheap). By default,
this buffer is `64KB` in size and can be changed via `RaftServerConfigKeys.Log.setWriteBufferSize`.
Beware that when one LogServer is hosting multiple RAFT groups (multiple "LogService Logs"), each
will LogServer will have its own buffer. Thus, high concurrency will result in multiple buffers.

## RAFT Server

Every RAFT server maintains a queue of I/O actions that it needs to execute. As with
much of Ratis, these actions are executed asynchronously and the client can block on
completion of these tasks as necessary. To prevent saturating memory, this queue of
items can be limited in size by both number of entries and size of the elements in the queue.
The former defaults to 4096 elements and id controlled by `RaftServerConfigKeys.Log.setElementLimit()`,
while the latter defaults to `64MB` and is controlled by `RaftServerConfigKeys.Log.setByteLimit()`.

## Do Not Set

Running a snapshot indicates that we can truncate part of the RAFT log, as the expectation is that
a snapshot is an equivalent representation of all of the updates from the log. However, the LogService
is written to expect that we maintain these records. As such, we must not allow snapshots to automatically
happen as we may lose records from the RAFT log. `RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled()`
defaults to `false` and should not be set to `true`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good tuning guide.

But Snapshot.setAutoTriggerEnabled() config doesn't look like a tuning, as it impacts the log service, shouldn't we throw an exception if it is set to true or explicitly set to false in code(just to avoid human errors)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we throw an exception if it is set to true or explicitly set to false in code(just to avoid human errors)

Very good suggestion. Let me change that.

31 changes: 31 additions & 0 deletions ratis-logservice/build-docker.sh
@@ -0,0 +1,31 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

LOGSERVICE="$(dirname "$0")"
LOGSERVICE="$(cd "$LOGSERVICE">/dev/null; pwd)"

# Get the version of the project
VERSION="$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)"

# Validate that the tarball is there
if [[ ! -f "target/ratis-logservice-${VERSION}-bin.tar.gz" ]]; then
echo "LogService assembly tarball missing, run 'mvn package assembly:single' first!"
exit 1
fi

docker build -t ratis-logservice --build-arg BINARY=target/ratis-logservice-$VERSION-bin.tar.gz --build-arg VERSION=$VERSION .
19 changes: 19 additions & 0 deletions ratis-logservice/client-env.sh
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

docker run --rm --network ratis-logservice_default -it ratis-logservice
7 changes: 7 additions & 0 deletions ratis-logservice/src/assembly/assembly.xml
Expand Up @@ -42,5 +42,12 @@
<include>log4j.properties</include>
</includes>
</fileSet>
<fileSet>
<directory>src/assembly/bin</directory>
<includes>
<include>*</include>
</includes>
<outputDirectory>bin</outputDirectory>
</fileSet>
</fileSets>
</assembly>
23 changes: 23 additions & 0 deletions ratis-logservice/src/assembly/bin/load-test
@@ -0,0 +1,23 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

LOGSERVICE="$(dirname "$0")"
LOGSERVICE="$(cd "$LOGSERVICE/..">/dev/null; pwd)"

export CLASSPATH="${LOGSERVICE}/conf:${LOGSERVICE}/lib/*"
exec java -XX:OnOutOfMemoryError="kill -9 %p" $LOGSERVICE_OPTS org.apache.ratis.logservice.tool.VerificationTool "$@"
23 changes: 23 additions & 0 deletions ratis-logservice/src/assembly/bin/shell
@@ -0,0 +1,23 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

LOGSERVICE="$(dirname "$0")"
LOGSERVICE="$(cd "$LOGSERVICE/..">/dev/null; pwd)"

export CLASSPATH="${LOGSERVICE}/conf:${LOGSERVICE}/lib/*"
exec java -XX:OnOutOfMemoryError="kill -9 %p" $LOGSERVICE_OPTS org.apache.ratis.logservice.shell.LogServiceShell "$@"
Expand Up @@ -20,9 +20,15 @@
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.logservice.util.LogServiceUtils;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;

/**
* A base class to encapsulate functionality around a long-lived Java process which runs a state machine.
Expand All @@ -39,6 +45,32 @@ public ServerOpts getServerOpts() {
return opts;
}

/**
* Sets common Ratis server properties for both the log and metadata state machines.
*/
void setRaftProperties(RaftProperties properties) {
// Set the ports for the server
GrpcConfigKeys.Server.setPort(properties, opts.getPort());
NettyConfigKeys.Server.setPort(properties, opts.getPort());

// Ozone sets the leader election timeout (min) to 1second.
TimeDuration leaderElectionTimeoutMin = TimeDuration.valueOf(1, TimeUnit.SECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMin(properties, leaderElectionTimeoutMin);
TimeDuration leaderElectionMaxTimeout = TimeDuration.valueOf(
leaderElectionTimeoutMin.toLong(TimeUnit.MILLISECONDS) + 200,
TimeUnit.MILLISECONDS);
RaftServerConfigKeys.Rpc.setTimeoutMax(properties, leaderElectionMaxTimeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, why are we hardcoding these values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a copy-paste out of Ozone. I don't have a good understand of why they chose these.

Was going for just calling out things we want to set in Ratis, but leaving the actual configuration to the work Vlad is going to get to.

}

/**
* Validates that there are no properties set which are in conflict with the LogService.
*/
void validateRaftProperties(RaftProperties properties) {
if (RaftServerConfigKeys.Snapshot.autoTriggerEnabled(properties)) {
throw new IllegalStateException("Auto triggering snapshots is disallowed by the LogService");
}
}

static ServerOpts buildOpts(String hostname, String metaQuorum, int port, String workingDir) {
ServerOpts opts = new ServerOpts();
opts.setHost(hostname);
Expand Down
Expand Up @@ -23,13 +23,13 @@
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.logservice.util.LogServiceUtils;
import org.apache.ratis.logservice.util.MetaServiceProtoUtil;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
Expand All @@ -38,6 +38,8 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,6 +53,7 @@ public class LogServer extends BaseServer {

public LogServer(ServerOpts opts) {
super(opts);
LOG.debug("Log Server options: {}", opts);
}

public RaftServer getServer() {
Expand All @@ -61,13 +64,31 @@ public static Builder newBuilder() {
return new Builder();
}

@Override
void setRaftProperties(RaftProperties properties) {
super.setRaftProperties(properties);

// Increase the client timeout
RaftClientConfigKeys.Rpc.setRequestTimeout(properties, TimeDuration.valueOf(100, TimeUnit.SECONDS));

// Increase the segment size to avoid rolling so quickly
SizeInBytes segmentSize = SizeInBytes.valueOf("32MB");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why hardcoding? can't we do it through some config? or it is just default and will be overridden in later step?

Copy link
Member Author

@joshelser joshelser May 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one was weird. I was playing with making it configurable, but changes to it had some really harmful implications (things were just hanging when I increased this, for example).

Edit: was thinking about the write-buffer size. I don't think I played around with this.

I left it here as something for us to come back to later, but not something I wanted to have people tweak. Maybe it's OK to still let this be configurable? (with a big-fat-warning :P)

RaftServerConfigKeys.Log.setSegmentSizeMax(properties, segmentSize);
RaftServerConfigKeys.Log.setPreallocatedSize(properties, segmentSize);

// TODO this seems to cause errors, not sure if pushing Ratis too hard?
// SizeInBytes writeBufferSize = SizeInBytes.valueOf("128KB");
// RaftServerConfigKeys.Log.setWriteBufferSize(properties, writeBufferSize);
}

public void start() throws IOException {
final ServerOpts opts = getServerOpts();
Set<RaftPeer> peers = LogServiceUtils.getPeersFromQuorum(opts.getMetaQuorum());
RaftProperties properties = new RaftProperties();
properties.set("raft.client.rpc.request.timeout", "100000");
GrpcConfigKeys.Server.setPort(properties, opts.getPort());
NettyConfigKeys.Server.setPort(properties, opts.getPort());

// Set properties for the log server state machine
setRaftProperties(properties);

InetSocketAddress addr = new InetSocketAddress(opts.getHost(), opts.getPort());
if(opts.getWorkingDir() != null) {
RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(opts.getWorkingDir())));
Expand All @@ -77,6 +98,10 @@ public void start() throws IOException {
final RaftGroupId logServerGroupId = RaftGroupId.valueOf(opts.getLogServerGroupId());
RaftGroup all = RaftGroup.valueOf(logServerGroupId, peer);
RaftGroup meta = RaftGroup.valueOf(RaftGroupId.valueOf(opts.getMetaGroupId()), peers);

// Make sure that we aren't setting any invalid/harmful properties
validateRaftProperties(properties);

raftServer = RaftServer.newBuilder()
.setStateMachineRegistry(new StateMachine.Registry() {
@Override
Expand Down
Expand Up @@ -56,7 +56,7 @@ public LogServiceRaftLogReader(RaftLog raftLog) {
* element, but take care to check if a value is present using {@link #hasNext()} first.
*/
public void seek(long recordId) throws RaftLogIOException, InvalidProtocolBufferException {
LOG.debug("Seeking to recordId={}", recordId);
LOG.trace("Seeking to recordId={}", recordId);
// RaftLog starting index
currentRaftIndex = raftLog.getStartIndex();
currentRecordId = 0;
Expand Down Expand Up @@ -99,10 +99,12 @@ public ByteString next() throws RaftLogIOException, InvalidProtocolBufferExcepti
private void loadNext() throws RaftLogIOException, InvalidProtocolBufferException {
// Clear the old "current" record
currentRecord = null;
LOG.debug("Loading next value: raftIndex={}, recordId={}, proto='{}', offset={}",
currentRaftIndex, currentRecordId,
currentLogEntry == null ? "null" : TextFormat.shortDebugString(currentLogEntry),
currentLogEntryOffset);
if (LOG.isTraceEnabled()) {
LOG.trace("Loading next value: raftIndex={}, recordId={}, proto='{}', offset={}",
currentRaftIndex, currentRecordId,
currentLogEntry == null ? "null" : TextFormat.shortDebugString(currentLogEntry),
currentLogEntryOffset);
}
// Continue iterating over the current entry.
if (currentLogEntry != null) {
assert currentLogEntryOffset != -1;
Expand Down
Expand Up @@ -27,9 +27,6 @@
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -271,15 +268,15 @@ public CompletableFuture<Message> query(Message request) {
private CompletableFuture<Message> processGetSizeRequest(LogServiceRequestProto proto) {
GetLogSizeRequestProto msgProto = proto.getSizeRequest();
Throwable t = verifyState(State.OPEN);
LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.dataRecordsSize);
LOG.trace("Size query: {}, Result: {}", msgProto, this.dataRecordsSize);
return CompletableFuture.completedFuture(Message
.valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.dataRecordsSize, t).toByteString()));
}

private CompletableFuture<Message> processGetLengthRequest(LogServiceRequestProto proto) {
GetLogLengthRequestProto msgProto = proto.getLengthQuery();
Throwable t = verifyState(State.OPEN);
LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.length);
LOG.trace("Length query: {}, Result: {}", msgProto, this.length);
return CompletableFuture.completedFuture(Message
.valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length, t).toByteString()));
}
Expand Down Expand Up @@ -359,9 +356,9 @@ private CompletableFuture<Message> processAppendRequest(TransactionContext trx,
CompletableFuture.completedFuture(
Message.valueOf(LogServiceProtoUtil.toAppendLogReplyProto(ids, t).toByteString()));
final RaftProtos.RaftPeerRole role = trx.getServerRole();
LOG.debug("{}:{}-{}: {} new length {}", role, getId(), index, proto, dataRecordsSize);
if (LOG.isTraceEnabled()) {
LOG.trace("{}-{}: variables={}", getId(), index, dataRecordsSize);
LOG.trace("{}:{}-{}: {} new length {}", role, getId(), index,
TextFormat.shortDebugString(proto), dataRecordsSize);
}
return f;
}
Expand Down