Skip to content

Commit

Permalink
Merge pull request #225 from aiven/ivanyu/segment-metadata-cache
Browse files Browse the repository at this point in the history
Add segment manifest cache
  • Loading branch information
jeqo committed May 17, 2023
2 parents 94cefb2 + b50c442 commit 061eda8
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 5 deletions.
1 change: 1 addition & 0 deletions commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ dependencies {
implementation "commons-io:commons-io:2.11.0"
// S3
implementation 'com.amazonaws:aws-java-sdk-s3:1.12.418'
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.4'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.commons;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;

import io.aiven.kafka.tieredstorage.commons.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.commons.storage.FileFetcher;
import io.aiven.kafka.tieredstorage.commons.storage.StorageBackEndException;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;

class SegmentManifestProvider {
private static final long GET_TIMEOUT_SEC = 10;

private final ObjectKey objectKey;
private final AsyncLoadingCache<String, SegmentManifest> cache;

/**
* @param maxCacheSize the max cache size (in items) or empty if the cache is unbounded.
* @param cacheRetention the retention time of items in the cache or empty if infinite retention.
*/
SegmentManifestProvider(final ObjectKey objectKey,
final Optional<Long> maxCacheSize,
final Optional<Duration> cacheRetention,
final FileFetcher fileFetcher,
final ObjectMapper mapper,
final Executor executor) {
this.objectKey = objectKey;
final var cacheBuilder = Caffeine.newBuilder()
.executor(executor);
maxCacheSize.ifPresent(cacheBuilder::maximumSize);
cacheRetention.ifPresent(cacheBuilder::expireAfterWrite);
this.cache = cacheBuilder.buildAsync(new Loader(fileFetcher, mapper));
}

private static class Loader implements AsyncCacheLoader<String, SegmentManifest> {
private final FileFetcher fileFetcher;
private final ObjectMapper mapper;

private Loader(final FileFetcher fileFetcher, final ObjectMapper mapper) {
this.fileFetcher = fileFetcher;
this.mapper = mapper;
}

@Override
public CompletableFuture<? extends SegmentManifest> asyncLoad(
final String key, final Executor executor) throws Exception {
final Supplier<SegmentManifest> fetcher = () -> {
try (final InputStream is = fileFetcher.fetch(key)) {
return mapper.readValue(is, SegmentManifest.class);
} catch (final StorageBackEndException | IOException e) {
// Wrap these exceptions to unwrap them later.
throw new CompletionException(e);
}
};
return CompletableFuture.supplyAsync(fetcher, executor);
}
}

SegmentManifest get(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws StorageBackEndException, IOException {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
try {
return cache.get(key).get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (final ExecutionException e) {
// Unwrap previously wrapped exceptions if possible.

final Throwable cause = e.getCause();

// We don't really expect this case, but handle it nevertheless.
if (cause == null) {
throw new RuntimeException(e);
}
if (e.getCause() instanceof StorageBackEndException) {
throw (StorageBackEndException) e.getCause();
}
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}

throw new RuntimeException(e);
} catch (final InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;

import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
Expand Down Expand Up @@ -77,6 +79,8 @@ public class UniversalRemoteStorageManager implements RemoteStorageManager {
private final Metrics metrics;
private final Sensor segmentCopyPerSec;

private final Executor executor = new ForkJoinPool();

private UniversalRemoteStorageManagerConfig config;

private ObjectStorageFactory objectStorageFactory;
Expand All @@ -89,6 +93,8 @@ public class UniversalRemoteStorageManager implements RemoteStorageManager {
private ChunkManager chunkManager;
private ObjectKey objectKey;

private SegmentManifestProvider segmentManifestProvider;

UniversalRemoteStorageManager() {
this(Time.SYSTEM);
}
Expand Down Expand Up @@ -130,6 +136,14 @@ public void configure(final Map<String, ?> configs) {
compression = config.compressionEnabled();

mapper = getObjectMapper();

segmentManifestProvider = new SegmentManifestProvider(
objectKey,
config.segmentManifestCacheSize(),
config.segmentManifestCacheRetention(),
objectStorageFactory.fileFetcher(),
mapper,
executor);
}

private ObjectMapper getObjectMapper() {
Expand Down Expand Up @@ -227,14 +241,11 @@ public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegme
final int startPosition,
final int endPosition) throws RemoteStorageException {
try {
final InputStream manifest = objectStorageFactory.fileFetcher()
.fetch(objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST));

final SegmentManifestV1 segmentManifestV1 = mapper.readValue(manifest, SegmentManifestV1.class);
final SegmentManifest segmentManifest = segmentManifestProvider.get(remoteLogSegmentMetadata);
final FetchChunkEnumeration fetchChunkEnumeration = new FetchChunkEnumeration(
chunkManager,
remoteLogSegmentMetadata,
segmentManifestV1,
segmentManifest,
startPosition,
endPosition);
return new SequenceInputStream(fetchChunkEnumeration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.aiven.kafka.tieredstorage.commons;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -37,6 +39,20 @@ public class UniversalRemoteStorageManagerConfig extends AbstractConfig {
private static final String OBJECT_STORAGE_KEY_PREFIX_CONFIG = "key.prefix";
private static final String OBJECT_STORAGE_KEY_PREFIX_DOC = "The object storage path prefix";

private static final String SEGMENT_MANIFEST_CACHE_PREFIX = "segment.manifest.cache.";
private static final String SEGMENT_MANIFEST_CACHE_SIZE_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX + "size";
private static final Long SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT = 1000L; // TODO consider a better default
private static final String SEGMENT_MANIFEST_CACHE_SIZE_DOC =
"The size in items of the segment manifest cache. "
+ "Use -1 for \"unbounded\". The default is 1000.";

public static final String SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG = SEGMENT_MANIFEST_CACHE_PREFIX
+ "retention.ms";
public static final long SEGMENT_MANIFEST_CACHE_RETENTION_MS_DEFAULT = 3_600_000; // 1 hour
private static final String SEGMENT_MANIFEST_CACHE_RETENTION_MS_DOC =
"The retention time for the segment manifest cache. "
+ "Use -1 for \"forever\". The default is 3_600_000 (1 hour).";

private static final String CHUNK_SIZE_CONFIG = "chunk.size";
private static final String CHUNK_SIZE_DOC = "The chunk size of log files";

Expand Down Expand Up @@ -79,6 +95,23 @@ public class UniversalRemoteStorageManagerConfig extends AbstractConfig {
OBJECT_STORAGE_KEY_PREFIX_DOC
);

CONFIG.define(
SEGMENT_MANIFEST_CACHE_SIZE_CONFIG,
ConfigDef.Type.LONG,
SEGMENT_MANIFEST_CACHE_SIZE_DEFAULT,
ConfigDef.Range.atLeast(-1L),
ConfigDef.Importance.LOW,
SEGMENT_MANIFEST_CACHE_SIZE_DOC
);
CONFIG.define(
SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG,
ConfigDef.Type.LONG,
SEGMENT_MANIFEST_CACHE_RETENTION_MS_DEFAULT,
ConfigDef.Range.atLeast(-1L),
ConfigDef.Importance.LOW,
SEGMENT_MANIFEST_CACHE_RETENTION_MS_DOC
);

CONFIG.define(
CHUNK_SIZE_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -157,6 +190,22 @@ ObjectStorageFactory objectStorageFactory() {
return objectFactory;
}

Optional<Long> segmentManifestCacheSize() {
final long rawValue = getLong(SEGMENT_MANIFEST_CACHE_SIZE_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(rawValue);
}

Optional<Duration> segmentManifestCacheRetention() {
final long rawValue = getLong(SEGMENT_MANIFEST_CACHE_RETENTION_MS_CONFIG);
if (rawValue == -1) {
return Optional.empty();
}
return Optional.of(Duration.ofMillis(rawValue));
}

ChunkCache chunkCache() {
final Class<?> chunkCacheClass = getClass(CHUNK_CACHE_CONFIG);
if (chunkCacheClass != null) {
Expand Down
Loading

0 comments on commit 061eda8

Please sign in to comment.