Skip to content

Commit

Permalink
Direct I/O entry log support
Browse files Browse the repository at this point in the history
The implementation uses JNI to do direct I/O to files via posix
syscalls. Fallocate is used if running on linux, otherwise this is
skipped (at the cost of more filesystem operates during writing).

There are two calls to write, writeAt and writeDelimited. I expect
writeAt to be used for the entrylog headers, which entries will go
through writeDelimited. In both cases, the calls may return before the
syscalls occur. #flush() needs to be called to ensure things are
actually written.

The entry log format isn't much changed from what is used by the
existing entrylogger. The biggest difference is the padding. Direct
I/O must write in aligned blocked. The size of the alignment varies by
machine configuration, but 4K is a safe bet on most. As it is unlikely
that entry data will land exactly on the alignment boundary, we need
to add padding to writes. The existing entry logger has been changed
to take this padding into account. When read as a signed int/long/byte
the padding will aways parse to a negative value, which distinguishes
it from valid entry data (the entry size will always be positive) and
also from preallocated space (which is always 0).

Another difference in the format is that the header is now 4K rather
than 1K. Again, this is to allow aligned rights. No changes are
necessary to allow the existing entry logger to deal with the header
change, as we create a dummy entry in the extra header space that the
existing entry logger already knows to ignore.

To enable, set dbStorage_directIOEntryLogger=true in the configuration.
  • Loading branch information
Ivan Kelly authored and mauricebarnum committed Apr 2, 2022
1 parent 91a26fc commit 703a042
Show file tree
Hide file tree
Showing 30 changed files with 5,335 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ public abstract class AbstractLogCompactor {
protected final ServerConfiguration conf;
protected final Throttler throttler;

interface LogRemovalListener {
/**
* LogRemovalListener.
*/
public interface LogRemovalListener {
void removeEntryLog(long logToRemove);
}

Expand Down Expand Up @@ -71,7 +74,7 @@ public static class Throttler {
}

// acquire. if bybytes: bytes of this entry; if byentries: 1.
void acquire(int permits) {
public void acquire(int permits) {
rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public long getEntry() {
public long getLocation() {
return location;
}

@Override
public String toString() {
return new StringBuilder().append("EntryLocation{")
.append("ledger=").append(ledger)
.append(",entry=").append(entry)
.append(",locationLog=").append(location >> 32 & 0xFFFFFFFF)
.append(",locationOffset=").append((int) (location & 0xFFFFFFFF))
.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -999,23 +999,24 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
return;
}
long offset = pos;
pos += 4;

int entrySize = headerBuffer.readInt();
if (entrySize <= 0) { // hitting padding
pos++;
headerBuffer.clear();
continue;
}
long ledgerId = headerBuffer.readLong();
headerBuffer.clear();

pos += 4;
if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
// skip this entry
pos += entrySize;
continue;
}
// read the entry
data.clear();
if (entrySize <= 0) {
LOG.warn("bad read for ledger entry from entryLog {}@{} (entry size {})",
entryLogId, pos, entrySize);
return;
}
data.capacity(entrySize);
int rc = readFromLogChannel(entryLogId, bc, data, pos);
if (rc != entrySize) {
Expand Down Expand Up @@ -1080,7 +1081,9 @@ EntryLogMetadata extractEntryLogMetadataFromIndex(long entryLogId) throws IOExce
bc.read(sizeBuffer.get(), offset);

int ledgersMapSize = sizeBuffer.get().readInt();

if (ledgersMapSize <= 0) {
break;
}
// Read the index into a buffer
ledgersMap.clear();
bc.read(ledgersMap, offset + 4, ledgersMapSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie.storage;

import java.io.IOException;

/**
* Generate unique entry log ids.
*/
public interface EntryLogIds {
/**
* Get the next available entry log ID.
*/
int nextId() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie.storage;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.storage.directentrylogger.Events;
import org.apache.bookkeeper.slogger.Slogger;

/**
* EntryLogIdsImpl.
*/
public class EntryLogIdsImpl implements EntryLogIds {
public static final Pattern FILE_PATTERN = Pattern.compile("^([0-9a-fA-F]+)\\.log$");
public static final Pattern COMPACTED_FILE_PATTERN =
Pattern.compile("^([0-9a-fA-F]+)\\.log\\.([0-9a-fA-F]+)\\.compacted$");

private final LedgerDirsManager ledgerDirsManager;
private final Slogger slog;
private int nextId;
private int maxId;

public EntryLogIdsImpl(LedgerDirsManager ledgerDirsManager,
Slogger slog) throws IOException {
this.ledgerDirsManager = ledgerDirsManager;
this.slog = slog;
findLargestGap();
}

@Override
public int nextId() throws IOException {
while (true) {
synchronized (this) {
int current = nextId;
nextId++;
if (nextId == maxId) {
findLargestGap();
} else {
return current;
}
}
}
}

private void findLargestGap() throws IOException {
long start = System.nanoTime();
List<Integer> currentIds = new ArrayList<Integer>();

for (File ledgerDir : ledgerDirsManager.getAllLedgerDirs()) {
currentIds.addAll(logIdsInDirectory(ledgerDir));
currentIds.addAll(compactedLogIdsInDirectory(ledgerDir));
}

int[] gap = findLargestGap(currentIds);
nextId = gap[0];
maxId = gap[1];
slog.kv("dirs", ledgerDirsManager.getAllLedgerDirs())
.kv("nextId", nextId)
.kv("maxId", maxId)
.kv("durationMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))
.info(Events.ENTRYLOG_IDS_CANDIDATES_SELECTED);
}

/**
* O(nlogn) algorithm to find largest contiguous gap between
* integers in a passed list. n should be relatively small.
* Entry logs should be about 1GB in size, so even if the node
* stores a PB, there should be only 1000000 entry logs.
*/
static int[] findLargestGap(List<Integer> currentIds) {
if (currentIds.isEmpty()) {
return new int[] { 0, Integer.MAX_VALUE };
}

Collections.sort(currentIds);

int nextIdCandidate = 0;
int maxIdCandidate = currentIds.get(0);
int maxGap = maxIdCandidate - nextIdCandidate;
for (int i = 0; i < currentIds.size(); i++) {
int gapStart = currentIds.get(i) + 1;
int j = i + 1;
int gapEnd = Integer.MAX_VALUE;
if (j < currentIds.size()) {
gapEnd = currentIds.get(j);
}
int gapSize = gapEnd - gapStart;
if (gapSize > maxGap) {
maxGap = gapSize;
nextIdCandidate = gapStart;
maxIdCandidate = gapEnd;
}
}
return new int[] { nextIdCandidate, maxIdCandidate };
}

public static List<Integer> logIdsInDirectory(File directory) {
List<Integer> ids = new ArrayList<>();
File[] files = directory.listFiles();
for (File f : files) {
Matcher m = FILE_PATTERN.matcher(f.getName());
if (m.matches()) {
int logId = Integer.parseUnsignedInt(m.group(1), 16);
ids.add(logId);
}
}
return ids;
}

private static List<Integer> compactedLogIdsInDirectory(File directory) {
List<Integer> ids = new ArrayList<>();
File[] files = directory.listFiles();
for (File f : files) {
Matcher m = COMPACTED_FILE_PATTERN.matcher(f.getName());
if (m.matches()) {
int logId = Integer.parseUnsignedInt(m.group(1), 16);
ids.add(logId);
}
}
return ids;
}
}
Loading

0 comments on commit 703a042

Please sign in to comment.