Skip to content

Commit

Permalink
Merge #4782 #4832
Browse files Browse the repository at this point in the history
4782: chore(broker): handle out of disk space in broker r=deepthidevaki a=deepthidevaki

## Description

* expose configuration parameters for minimum free disk space
* When broker free disk space available is less than configured min free disk,
  - reject client requests
  - reject commands from other partitions
  - pause stream processor

## Related issues

closes #4441 

#

4832: feat(clients/java): add user-agent to java client requests r=MiguelPires a=MiguelPires

## Description

Add user-agent header with client and version information to gRPC and OAuth requests.

## Related issues

closes #4265 

#

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Miguel Pires <miguel.pires@camunda.com>
  • Loading branch information
3 people committed Jul 8, 2020
3 parents 13badd3 + 32a1434 + c3bcdfc commit b41b007
Show file tree
Hide file tree
Showing 42 changed files with 1,352 additions and 96 deletions.
Expand Up @@ -441,6 +441,17 @@ public Builder withMaxEntrySize(final MemorySize maxEntrySize) {
return this;
}

/**
* Set the minimum free disk space (in bytes) to leave when allocating a new segment
*
* @param freeDiskSpace free disk space in bytes
* @return the Raft partition group builder
*/
public Builder withFreeDiskSpace(final long freeDiskSpace) {
config.getStorageConfig().setFreeDiskSpace(freeDiskSpace);
return this;
}

/**
* Enables flush on commit.
*
Expand Down
Expand Up @@ -34,12 +34,14 @@ public class RaftStorageConfig {
private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
private static final PersistedSnapshotStoreFactory DEFAULT_SNAPSHOT_STORE_FACTORY =
new FileBasedSnapshotStoreFactory();
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024 * 1; // 1GB

private String directory;
private StorageLevel level = DEFAULT_STORAGE_LEVEL;
private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
private long segmentSize = DEFAULT_MAX_SEGMENT_SIZE;
private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
private long freeDiskSpace = DEFAULT_FREE_DISK_SPACE;

@Optional("SnapshotStoreFactory")
private PersistedSnapshotStoreFactory persistedSnapshotStoreFactory =
Expand Down Expand Up @@ -168,4 +170,24 @@ public RaftStorageConfig setPersistedSnapshotStoreFactory(
this.persistedSnapshotStoreFactory = persistedSnapshotStoreFactory;
return this;
}

/**
* Returns the minimum free disk space buffer to leave when allocating a new segment
*
* @return free disk buffer
*/
public long getFreeDiskSpace() {
return this.freeDiskSpace;
}

/**
* Sets the minimum free disk space buffer
*
* @param freeDiskSpace
* @return
*/
public RaftStorageConfig setFreeDiskSpace(final long freeDiskSpace) {
this.freeDiskSpace = freeDiskSpace;
return this;
}
}
Expand Up @@ -302,7 +302,7 @@ private RaftStorage createRaftStorage() {
.withMaxEntrySize((int) storageConfig.getMaxEntrySize().bytes())
.withFlushOnCommit(storageConfig.isFlushOnCommit())
.withDynamicCompaction(compactionConfig.isDynamic())
.withFreeDiskBuffer(compactionConfig.getFreeDiskBuffer())
.withFreeDiskSpace(storageConfig.getFreeDiskSpace())
.withFreeMemoryBuffer(compactionConfig.getFreeMemoryBuffer())
.withNamespace(RaftNamespaces.RAFT_STORAGE)
.withSnapshotStore(persistedSnapshotStore)
Expand Down
Expand Up @@ -68,7 +68,7 @@ public final class RaftStorage {
private final int maxEntrySize;
private final int maxEntriesPerSegment;
private final boolean dynamicCompaction;
private final double freeDiskBuffer;
private final long freeDiskSpace;
private final double freeMemoryBuffer;
private final boolean flushOnCommit;
private final boolean retainStaleSnapshots;
Expand All @@ -85,7 +85,7 @@ private RaftStorage(
final int maxEntrySize,
final int maxEntriesPerSegment,
final boolean dynamicCompaction,
final double freeDiskBuffer,
final long freeDiskSpace,
final double freeMemoryBuffer,
final boolean flushOnCommit,
final boolean retainStaleSnapshots,
Expand All @@ -100,7 +100,7 @@ private RaftStorage(
this.maxEntrySize = maxEntrySize;
this.maxEntriesPerSegment = maxEntriesPerSegment;
this.dynamicCompaction = dynamicCompaction;
this.freeDiskBuffer = freeDiskBuffer;
this.freeDiskSpace = freeDiskSpace;
this.freeMemoryBuffer = freeMemoryBuffer;
this.flushOnCommit = flushOnCommit;
this.retainStaleSnapshots = retainStaleSnapshots;
Expand Down Expand Up @@ -185,12 +185,12 @@ public boolean dynamicCompaction() {
}

/**
* Returns the percentage of disk space that must be available before log compaction is forced.
* Returns the amount of disk space that must be available before log compaction is forced.
*
* @return the percentage of disk space that must be available before log compaction is forced
* @return the amount of disk space that must be available before log compaction is forced
*/
public double freeDiskBuffer() {
return freeDiskBuffer;
public long freeDiskSpace() {
return freeDiskSpace;
}

/**
Expand Down Expand Up @@ -317,6 +317,7 @@ public RaftLog openLog() {
.withNamespace(namespace)
.withMaxSegmentSize(maxSegmentSize)
.withMaxEntrySize(maxEntrySize)
.withFreeDiskSpace(freeDiskSpace)
.withMaxEntriesPerSegment(maxEntriesPerSegment)
.withFlushOnCommit(flushOnCommit)
.withJournalIndexFactory(journalIndexFactory)
Expand Down Expand Up @@ -399,7 +400,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; // 1MB
private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
private static final boolean DEFAULT_DYNAMIC_COMPACTION = true;
private static final double DEFAULT_FREE_DISK_BUFFER = .2;
private static final long DEFAULT_FREE_DISK_SPACE = 1024L * 1024 * 1024; // 1GB
private static final double DEFAULT_FREE_MEMORY_BUFFER = .2;
private static final boolean DEFAULT_FLUSH_ON_COMMIT = true;
private static final boolean DEFAULT_RETAIN_STALE_SNAPSHOTS = false;
Expand All @@ -412,7 +413,7 @@ public static final class Builder implements io.atomix.utils.Builder<RaftStorage
private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
private boolean dynamicCompaction = DEFAULT_DYNAMIC_COMPACTION;
private double freeDiskBuffer = DEFAULT_FREE_DISK_BUFFER;
private long freeDiskSpace = DEFAULT_FREE_DISK_SPACE;
private double freeMemoryBuffer = DEFAULT_FREE_MEMORY_BUFFER;
private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
private boolean retainStaleSnapshots = DEFAULT_RETAIN_STALE_SNAPSHOTS;
Expand Down Expand Up @@ -582,13 +583,12 @@ public Builder withDynamicCompaction(final boolean dynamicCompaction) {
* Sets the percentage of free disk space that must be preserved before log compaction is
* forced.
*
* @param freeDiskBuffer the free disk percentage
* @param freeDiskSpace the free disk percentage
* @return the Raft log builder
*/
public Builder withFreeDiskBuffer(final double freeDiskBuffer) {
checkArgument(freeDiskBuffer > 0, "freeDiskBuffer must be positive");
checkArgument(freeDiskBuffer < 1, "freeDiskBuffer must be less than 1");
this.freeDiskBuffer = freeDiskBuffer;
public Builder withFreeDiskSpace(final long freeDiskSpace) {
checkArgument(freeDiskSpace > 0, "freeDiskSpace must be positive");
this.freeDiskSpace = freeDiskSpace;
return this;
}

Expand Down Expand Up @@ -711,7 +711,7 @@ public RaftStorage build() {
maxEntrySize,
maxEntriesPerSegment,
dynamicCompaction,
freeDiskBuffer,
freeDiskSpace,
freeMemoryBuffer,
flushOnCommit,
retainStaleSnapshots,
Expand Down
Expand Up @@ -215,6 +215,18 @@ public Builder withMaxEntrySize(final int maxEntrySize) {
return this;
}

/**
* Sets the minimum free disk space to leave when allocating a new segment
*
* @param freeDiskSpace free disk space in bytes
* @return the storage builder
* @throws IllegalArgumentException if the {@code freeDiskSpace} is not positive
*/
public Builder withFreeDiskSpace(final long freeDiskSpace) {
journalBuilder.withFreeDiskSpace(freeDiskSpace);
return this;
}

/**
* Sets the maximum number of allows entries per segment, returning the builder for method
* chaining.
Expand Down
55 changes: 28 additions & 27 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftRule.java
Expand Up @@ -469,6 +469,7 @@ private RaftStorage createStorage(
.withDirectory(memberDirectory)
.withMaxEntriesPerSegment(10)
.withMaxSegmentSize(1024 * 10)
.withFreeDiskSpace(100)
.withSnapshotStore(
new FileBasedSnapshotStoreFactory()
.createSnapshotStore(memberDirectory.toPath(), "1"))
Expand Down Expand Up @@ -560,33 +561,6 @@ public void triggerDataLossOnNode(final String node) throws IOException {
FileUtil.deleteFolder(memberDirectory.toPath());
}

private final class RaftSnapshotListener implements PersistedSnapshotListener {

private final MemberId memberId;

public RaftSnapshotListener(final MemberId memberId) {
this.memberId = memberId;
}

@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
final var raftServer = servers.get(memberId.id());
final var raftContext = raftServer.getContext();
final var serviceManager = raftContext.getServiceManager();
serviceManager.setCompactableIndex(persistedSnapshot.getIndex());

raftServer
.compact()
.whenComplete(
(v, t) -> {
final var latch = compactAwaiters.get(memberId.id()).get();
if (latch != null) {
latch.countDown();
}
});
}
}

private static final class CommitAwaiter {

private final long awaitedIndex;
Expand Down Expand Up @@ -635,4 +609,31 @@ public long awaitCommit() throws Exception {
return commitFuture.get(30, TimeUnit.SECONDS);
}
}

private final class RaftSnapshotListener implements PersistedSnapshotListener {

private final MemberId memberId;

public RaftSnapshotListener(final MemberId memberId) {
this.memberId = memberId;
}

@Override
public void onNewSnapshot(final PersistedSnapshot persistedSnapshot) {
final var raftServer = servers.get(memberId.id());
final var raftContext = raftServer.getContext();
final var serviceManager = raftContext.getServiceManager();
serviceManager.setCompactableIndex(persistedSnapshot.getIndex());

raftServer
.compact()
.whenComplete(
(v, t) -> {
final var latch = compactAwaiters.get(memberId.id()).get();
if (latch != null) {
latch.countDown();
}
});
}
}
}
8 changes: 4 additions & 4 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftTest.java
Expand Up @@ -771,13 +771,13 @@ public void onWriteError(final Throwable error) {
}

@Override
public void onCommitError(final Indexed<ZeebeEntry> indexed, final Throwable error) {
fail("Unexpected write error: " + error.getMessage());
public void onCommit(final Indexed<ZeebeEntry> indexed) {
commitFuture.complete(indexed.index());
}

@Override
public void onCommit(final Indexed<ZeebeEntry> indexed) {
commitFuture.complete(indexed.index());
public void onCommitError(final Indexed<ZeebeEntry> indexed, final Throwable error) {
fail("Unexpected write error: " + error.getMessage());
}

public long awaitCommit() throws Exception {
Expand Down
Expand Up @@ -45,7 +45,7 @@ public void testDefaultConfiguration() throws Exception {
assertEquals(1024 * 1024 * 32, storage.maxLogSegmentSize());
assertEquals(1024 * 1024, storage.maxLogEntriesPerSegment());
assertTrue(storage.dynamicCompaction());
assertEquals(.2, storage.freeDiskBuffer(), .01);
assertEquals(1024L * 1024 * 1024, storage.freeDiskSpace());
assertTrue(storage.isFlushOnCommit());
assertFalse(storage.isRetainStaleSnapshots());
assertTrue(storage.statistics().getFreeMemory() > 0);
Expand All @@ -60,7 +60,7 @@ public void testCustomConfiguration() throws Exception {
.withMaxSegmentSize(1024 * 1024)
.withMaxEntriesPerSegment(1024)
.withDynamicCompaction(false)
.withFreeDiskBuffer(.5)
.withFreeDiskSpace(100)
.withFlushOnCommit(false)
.withRetainStaleSnapshots()
.build();
Expand All @@ -69,7 +69,7 @@ public void testCustomConfiguration() throws Exception {
assertEquals(1024 * 1024, storage.maxLogSegmentSize());
assertEquals(1024, storage.maxLogEntriesPerSegment());
assertFalse(storage.dynamicCompaction());
assertEquals(.5, storage.freeDiskBuffer(), .01);
assertEquals(100, storage.freeDiskSpace());
assertFalse(storage.isFlushOnCommit());
assertTrue(storage.isRetainStaleSnapshots());
}
Expand Down
Expand Up @@ -65,6 +65,7 @@ public class SegmentedJournal<E> implements Journal<E> {
private final Collection<SegmentedJournalReader> readers = Sets.newConcurrentHashSet();
private volatile JournalSegment<E> currentSegment;
private volatile boolean open = true;
private final long minFreeDiskSpace;

public SegmentedJournal(
final String name,
Expand All @@ -75,7 +76,8 @@ public SegmentedJournal(
final int maxEntrySize,
final int maxEntriesPerSegment,
final boolean flushOnCommit,
final Supplier<JournalIndex> journalIndexFactory) {
final Supplier<JournalIndex> journalIndexFactory,
final long minFreeSpace) {
this.name = checkNotNull(name, "name cannot be null");
this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
this.directory = checkNotNull(directory, "directory cannot be null");
Expand All @@ -89,6 +91,7 @@ public SegmentedJournal(
journalIndexFactory == null
? () -> new SparseJournalIndex(DEFAULT_INDEX_DENSITY)
: journalIndexFactory;
this.minFreeDiskSpace = minFreeSpace;
open();
this.writer = openWriter();
}
Expand Down Expand Up @@ -293,7 +296,8 @@ private void assertOpen() {

/** Asserts that enough disk space is available to allocate a new segment. */
private void assertDiskSpace() {
if (directory().getUsableSpace() < maxSegmentSize() * SEGMENT_BUFFER_FACTOR) {
if (directory().getUsableSpace()
< Math.max((long) maxSegmentSize() * SEGMENT_BUFFER_FACTOR, minFreeDiskSpace)) {
throw new StorageException.OutOfDiskSpace(
"Not enough space to allocate a new journal segment");
}
Expand Down Expand Up @@ -698,7 +702,7 @@ public static class Builder<E> implements io.atomix.utils.Builder<SegmentedJourn
private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;

private static final long DEFAULT_MIN_FREE_DISK_SPACE = 1024L * 1024 * 1024 * 1;
protected String name = DEFAULT_NAME;
protected StorageLevel storageLevel = StorageLevel.DISK;
protected File directory = new File(DEFAULT_DIRECTORY);
Expand All @@ -709,6 +713,7 @@ public static class Builder<E> implements io.atomix.utils.Builder<SegmentedJourn

private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
private Supplier<JournalIndex> journalIndexFactory;
private long freeDiskSpace = DEFAULT_MIN_FREE_DISK_SPACE;

protected Builder() {}

Expand Down Expand Up @@ -809,6 +814,19 @@ public Builder<E> withMaxEntrySize(final int maxEntrySize) {
return this;
}

/**
* Sets the minimum free disk space to leave when allocating a new segment
*
* @param freeDiskSpace free disk space in bytes
* @return the storage builder
* @throws IllegalArgumentException if the {@code freeDiskSpace} is not positive
*/
public Builder<E> withFreeDiskSpace(final long freeDiskSpace) {
checkArgument(freeDiskSpace > 0, "minFreeDiskSpace must be positive");
this.freeDiskSpace = freeDiskSpace;
return this;
}

/**
* Sets the maximum number of allows entries per segment, returning the builder for method
* chaining.
Expand Down Expand Up @@ -881,7 +899,8 @@ public SegmentedJournal<E> build() {
maxEntrySize,
maxEntriesPerSegment,
flushOnCommit,
journalIndexFactory);
journalIndexFactory,
freeDiskSpace);
}
}
}

0 comments on commit b41b007

Please sign in to comment.