diff --git a/build.gradle b/build.gradle index ec4c24c59a..77ae9f5731 100644 --- a/build.gradle +++ b/build.gradle @@ -1014,12 +1014,11 @@ project(':core') { implementation libs.commonsCli // implementation libs.elasticstream - implementation(libs.esClient) { - exclude group: 'org.slf4j', module: 'slf4j-api' - } implementation 'redis.clients:jedis:4.3.1' implementation libs.slf4jlog4j implementation libs.s3Client + implementation libs.commonLang + implementation libs.nettyAll implementation libs.zstd diff --git a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java index 2d67975210..14b36b1ad9 100644 --- a/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java +++ b/core/src/main/scala/kafka/log/es/AlwaysSuccessClient.java @@ -17,17 +17,22 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.Client; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.ElasticStreamClientException; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; -import com.automq.elasticstream.client.flatc.header.ErrorCode; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.Client; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.ElasticStreamClientException; +import kafka.log.es.api.ErrorCode; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.KVClient; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; +import org.apache.kafka.common.errors.es.SlowFetchHintException; +import org.apache.kafka.common.utils.ThreadUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.Map; import java.util.Set; @@ -39,10 +44,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import org.apache.kafka.common.errors.es.SlowFetchHintException; -import org.apache.kafka.common.utils.ThreadUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class AlwaysSuccessClient implements Client { @@ -50,23 +51,23 @@ public class AlwaysSuccessClient implements Client { public static final Set HALT_ERROR_CODES = Set.of(ErrorCode.EXPIRED_STREAM_EPOCH, ErrorCode.STREAM_ALREADY_CLOSED); public static final long DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS = 10; private final ScheduledExecutorService streamManagerRetryScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); + ThreadUtils.createThreadFactory("stream-manager-retry-%d", true)); private final ExecutorService streamManagerCallbackExecutors = Executors.newFixedThreadPool(1, - ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true)); + ThreadUtils.createThreadFactory("stream-manager-callback-executor-%d", true)); private final ScheduledExecutorService appendRetryScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("append-retry-scheduler-%d", true)); + ThreadUtils.createThreadFactory("append-retry-scheduler-%d", true)); private final ScheduledExecutorService fetchRetryScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true)); + ThreadUtils.createThreadFactory("fetch-retry-scheduler-%d", true)); private final ScheduledExecutorService generalRetryScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("general-retry-scheduler-%d", true)); + ThreadUtils.createThreadFactory("general-retry-scheduler-%d", true)); private final ExecutorService generalCallbackExecutors = Executors.newFixedThreadPool(4, - ThreadUtils.createThreadFactory("general-callback-scheduler-%d", true)); + ThreadUtils.createThreadFactory("general-callback-scheduler-%d", true)); private final ExecutorService appendCallbackExecutors = Executors.newFixedThreadPool(4, - ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); + ThreadUtils.createThreadFactory("append-callback-scheduler-%d", true)); private final ExecutorService fetchCallbackExecutors = Executors.newFixedThreadPool(4, - ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); + ThreadUtils.createThreadFactory("fetch-callback-scheduler-%d", true)); private final ScheduledExecutorService delayFetchScheduler = Executors.newScheduledThreadPool(1, - ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); + ThreadUtils.createThreadFactory("fetch-delayer-%d", true)); private final StreamClient streamClient; private final KVClient kvClient; private final Delayer delayer; @@ -119,6 +120,7 @@ public void shutdownNow() { /** * Check if the exception is a ElasticStreamClientException with a halt error code. + * * @param t the exception * @return true if the exception is a ElasticStreamClientException with a halt error code, otherwise false */ @@ -235,7 +237,7 @@ private void append0(RecordBatch recordBatch, CompletableFuture cf stream.append(recordBatch).whenCompleteAsync((rst, ex) -> FutureUtil.suppress(() -> { if (ex != null) { if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { - LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); + LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS); } } else { @@ -247,6 +249,7 @@ private void append0(RecordBatch recordBatch, CompletableFuture cf /** * Append to stream without using async callback threadPools. * Used for tests only. + * * @param recordBatch * @param cf */ @@ -254,7 +257,7 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture< stream.append(recordBatch).whenComplete((rst, ex) -> FutureUtil.suppress(() -> { if (ex != null) { if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { - LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); + LOGGER.error("Appending to stream[{}] failed, retry later", streamId(), ex); appendRetryScheduler.schedule(() -> append0(recordBatch, cf), 3, TimeUnit.SECONDS); } } else { @@ -274,8 +277,8 @@ private void append0WithSyncCallback(RecordBatch recordBatch, CompletableFuture< * CompletableFuture with a {@link SlowFetchHintException} */ private CompletableFuture timeoutAndStoreFuture(String id, - CompletableFuture rawFuture, long timeout, - TimeUnit unit) { + CompletableFuture rawFuture, long timeout, + TimeUnit unit) { if (unit == null) { throw new NullPointerException(); } @@ -323,18 +326,18 @@ public CompletableFuture fetch(long startOffset, long endOffset, in fetch0(startOffset, endOffset, maxBytesHint, firstFetchFuture); // Try to have a quick fetch. If the first fetching is timeout, then complete with SlowFetchHintException. timeoutAndStoreFuture(holdUpKey, firstFetchFuture, slowFetchTimeoutMillis, TimeUnit.MILLISECONDS) - .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { - if (ex != null) { - if (ex instanceof SlowFetchHintException) { - LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); - cf.completeExceptionally(ex); - } else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { - cf.completeExceptionally(ex); + .whenComplete((rst, ex) -> FutureUtil.suppress(() -> { + if (ex != null) { + if (ex instanceof SlowFetchHintException) { + LOGGER.debug("Fetch stream[{}] [{},{}) timeout for {} ms, retry later with slow fetching", streamId(), startOffset, endOffset, DEFAULT_SLOW_FETCH_TIMEOUT_MILLIS); + cf.completeExceptionally(ex); + } else if (!maybeHaltAndCompleteWaitingFuture(ex, cf)) { + cf.completeExceptionally(ex); + } + } else { + cf.complete(rst); } - } else { - cf.complete(rst); - } - }, LOGGER)); + }, LOGGER)); } return cf; } @@ -426,7 +429,7 @@ public Delayer(ScheduledExecutorService delayFetchScheduler) { } public ScheduledFuture delay(Runnable command, long delay, - TimeUnit unit) { + TimeUnit unit) { return delayFetchScheduler.schedule(command, delay, unit); } } diff --git a/core/src/main/scala/kafka/log/es/DefaultAppendResult.java b/core/src/main/scala/kafka/log/es/DefaultAppendResult.java new file mode 100644 index 0000000000..6b52e93b0c --- /dev/null +++ b/core/src/main/scala/kafka/log/es/DefaultAppendResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es; + +import kafka.log.es.api.AppendResult; + +public class DefaultAppendResult implements AppendResult { + private final long baseOffset; + + public DefaultAppendResult(long baseOffset) { + this.baseOffset = baseOffset; + } + + @Override + public long baseOffset() { + return baseOffset; + } + + public String toString() { + return "AppendResult(baseOffset=" + baseOffset + ")"; + } +} + diff --git a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java index fc68426a03..18c0040749 100644 --- a/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/DefaultElasticStreamSlice.java @@ -17,11 +17,13 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; +import org.apache.kafka.common.errors.es.SlowFetchHintException; +import org.apache.kafka.common.utils.Utils; import java.io.IOException; import java.nio.ByteBuffer; @@ -30,8 +32,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import org.apache.kafka.common.errors.es.SlowFetchHintException; -import org.apache.kafka.common.utils.Utils; public class DefaultElasticStreamSlice implements ElasticStreamSlice { /** diff --git a/core/src/main/scala/kafka/log/es/DefaultRecordBatch.java b/core/src/main/scala/kafka/log/es/DefaultRecordBatch.java new file mode 100644 index 0000000000..3ef54c4b0d --- /dev/null +++ b/core/src/main/scala/kafka/log/es/DefaultRecordBatch.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es; + +import kafka.log.es.api.RecordBatch; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +public class DefaultRecordBatch implements RecordBatch { + private final int count; + private final long baseTimestamp; + private final Map properties; + private final ByteBuffer rawPayload; + + public DefaultRecordBatch(int count, long baseTimestamp, Map properties, ByteBuffer rawPayload) { + this.count = count; + this.baseTimestamp = baseTimestamp; + this.properties = properties; + this.rawPayload = rawPayload; + } + + @Override + public int count() { + return count; + } + + @Override + public long baseTimestamp() { + return baseTimestamp; + } + + @Override + public Map properties() { + if (properties == null) { + return Collections.emptyMap(); + } + return properties; + } + + @Override + public ByteBuffer rawPayload() { + return rawPayload.duplicate(); + } +} diff --git a/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpoint.scala b/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpoint.scala index ae1984eb0c..e412f89c07 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpoint.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpoint.scala @@ -23,13 +23,13 @@ import kafka.server.epoch.EpochEntry import scala.jdk.CollectionConverters.{ListHasAsScala, SeqHasAsJava} -class ElasticLeaderEpochCheckpoint(val meta: ElasticLeaderEpochCheckpointMeta, val saveFunc: ElasticLeaderEpochCheckpointMeta => Unit) extends LeaderEpochCheckpoint{ - override def write(epochs: Iterable[EpochEntry]): Unit = this.synchronized { - meta.setEntries(epochs.toList.asJava) - saveFunc(meta) - } +class ElasticLeaderEpochCheckpoint(val meta: ElasticLeaderEpochCheckpointMeta, val saveFunc: ElasticLeaderEpochCheckpointMeta => Unit) extends LeaderEpochCheckpoint { + override def write(epochs: Iterable[EpochEntry]): Unit = this.synchronized { + meta.setEntries(epochs.toList.asJava) + saveFunc(meta) + } - override def read(): collection.Seq[EpochEntry] = this.synchronized { - meta.entries().asScala.toSeq - } + override def read(): collection.Seq[EpochEntry] = this.synchronized { + meta.entries().asScala.toSeq + } } diff --git a/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpointMeta.java b/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpointMeta.java index 7e0b572164..8377c1cb9b 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpointMeta.java +++ b/core/src/main/scala/kafka/log/es/ElasticLeaderEpochCheckpointMeta.java @@ -17,10 +17,11 @@ package kafka.log.es; +import kafka.server.epoch.EpochEntry; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import kafka.server.epoch.EpochEntry; public class ElasticLeaderEpochCheckpointMeta { private final int version; @@ -33,11 +34,11 @@ public ElasticLeaderEpochCheckpointMeta(int version, List entries) { public byte[] encode() { int totalLength = 4 // version - + 4 // following entries size - + 12 * entries.size(); // all entries + + 4 // following entries size + + 12 * entries.size(); // all entries ByteBuffer buffer = ByteBuffer.allocate(totalLength) - .putInt(version) - .putInt(entries.size()); + .putInt(version) + .putInt(entries.size()); entries.forEach(entry -> buffer.putInt(entry.epoch()).putLong(entry.startOffset())); buffer.flip(); return buffer.array(); @@ -51,7 +52,7 @@ public static ElasticLeaderEpochCheckpointMeta decode(ByteBuffer buffer) { entryList.add(new EpochEntry(buffer.getInt(), buffer.getLong())); } if (entryList.size() != entryCount) { - throw new RuntimeException("expect entry count:" + entryCount + ", decoded " + entryList.size() + " entries"); + throw new RuntimeException("expect entry count:" + entryCount + ", decoded " + entryList.size() + " entries"); } return new ElasticLeaderEpochCheckpointMeta(version, entryList); } diff --git a/core/src/main/scala/kafka/log/es/ElasticLog.scala b/core/src/main/scala/kafka/log/es/ElasticLog.scala index 18488ea3ea..0feca876e3 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLog.scala @@ -17,9 +17,9 @@ package kafka.log.es -import com.automq.elasticstream.client.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions} import io.netty.buffer.Unpooled import kafka.log._ +import kafka.log.es.api.{Client, CreateStreamOptions, KeyValue, OpenStreamOptions} import kafka.log.es.metrics.Timer import kafka.server.checkpoints.LeaderEpochCheckpointFile import kafka.server.epoch.EpochEntry @@ -306,7 +306,7 @@ class ElasticLog(val metaStream: MetaStream, * Directly close all streams of the log. This method may throw IOException. */ def closeStreams(): Unit = { - try{ + try { CompletableFuture.allOf(streamManager.close(), metaStream.close()).get() } catch { case e: ExecutionException => @@ -367,8 +367,8 @@ object ElasticLog extends Logging { val metaStreamId = Unpooled.wrappedBuffer(keyValue.value()).readLong() // open partition meta stream val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.newBuilder().epoch(leaderEpoch).build()) - .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) - .get() + .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) + .get() info(s"${logIdent}opened existing meta stream: stream_id=$metaStreamId") stream } @@ -387,7 +387,7 @@ object ElasticLog extends Logging { def loadAllValidSnapshots(): mutable.Map[Long, ElasticPartitionProducerSnapshotMeta] = { metaMap.filter(kv => kv._1.startsWith(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX)) - .map(kv => (kv._1.stripPrefix(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX).toLong, kv._2.asInstanceOf[ElasticPartitionProducerSnapshotMeta])) + .map(kv => (kv._1.stripPrefix(MetaStream.PRODUCER_SNAPSHOT_KEY_PREFIX).toLong, kv._2.asInstanceOf[ElasticPartitionProducerSnapshotMeta])) } //load producer snapshots for this partition @@ -474,14 +474,14 @@ object ElasticLog extends Logging { // We have to close streams here since this log has not been added to currentLogs yet. It will not be handled // by LogDirFailureChannel. CoreUtils.swallow({ - if (metaStream != null) { - metaStream.close().get - } - if (logStreamManager != null) { - logStreamManager.close().get() - } - client.kvClient().delKV(java.util.Arrays.asList(key)).get() - }, this) + if (metaStream != null) { + metaStream.close().get + } + if (logStreamManager != null) { + logStreamManager.close().get() + } + client.kvClient().delKV(java.util.Arrays.asList(key)).get() + }, this) error(s"${logIdent}failed to open elastic log, trying to close streams and delete key. Error msg: ${e.getMessage}") throw e } @@ -525,8 +525,8 @@ object ElasticLog extends Logging { // Finally, destroy meta stream. metaStream.destroy() } finally { - // remove kv info - client.kvClient().delKV(java.util.Arrays.asList(key)).get() + // remove kv info + client.kvClient().delKV(java.util.Arrays.asList(key)).get() } info(s"$logIdent Destroyed with epoch ${currentEpoch + 1}") @@ -534,19 +534,19 @@ object ElasticLog extends Logging { private def openStreamWithRetry(client: Client, streamId: Long, epoch: Long, logIdent: String): MetaStream = { client.streamClient() - .openStream(streamId, OpenStreamOptions.newBuilder().epoch(epoch).build()) - .exceptionally(_ => client.streamClient() - .openStream(streamId, OpenStreamOptions.newBuilder().build()).join() - ).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) - .join() + .openStream(streamId, OpenStreamOptions.newBuilder().epoch(epoch).build()) + .exceptionally(_ => client.streamClient() + .openStream(streamId, OpenStreamOptions.newBuilder().build()).join() + ).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) + .join() } private[es] def createMetaStream(client: Client, key: String, replicaCount: Int, leaderEpoch: Long, logIdent: String): MetaStream = { val metaStream = client.streamClient().createAndOpenStream(CreateStreamOptions.newBuilder() .replicaCount(replicaCount) .epoch(leaderEpoch).build() - ).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) - .get() + ).thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) + .get() // save partition meta stream id relation to PM val streamId = metaStream.streamId() info(s"${logIdent}created meta stream for $key, streamId: $streamId") @@ -567,7 +567,7 @@ object ElasticLog extends Logging { * For the newly created cleaned segment, the meta should not be saved here. It will be saved iff the replacement happens. */ private def createAndSaveSegment(logSegmentManager: ElasticLogSegmentManager, suffix: String = "", logIdent: String)(baseOffset: Long, dir: File, - config: LogConfig, streamSliceManager: ElasticStreamSliceManager, time: Time): ElasticLogSegment = { + config: LogConfig, streamSliceManager: ElasticStreamSliceManager, time: Time): ElasticLogSegment = { if (!suffix.equals("") && !suffix.equals(LocalLog.CleanedFileSuffix)) { throw new IllegalArgumentException("suffix must be empty or " + LocalLog.CleanedFileSuffix) } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java index 40221455eb..d5df5750ac 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogFileRecords.java @@ -17,10 +17,10 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatchWithContext; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.RecordBatchWithContext; import org.apache.kafka.common.errors.es.SlowFetchHintException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.AbstractRecords; @@ -37,6 +37,7 @@ import org.apache.kafka.common.record.RecordsUtil; import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +53,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.kafka.common.utils.Utils; - public class ElasticLogFileRecords { private static final Logger LOGGER = LoggerFactory.getLogger(ElasticLogFileRecords.class); protected final AtomicInteger size; @@ -135,7 +134,8 @@ private Records readAll0(long startOffset, long maxOffset, int maxSize) throws S * Append records to segment. * Note that lastOffset is the expected value of nextOffset after append. lastOffset = (the real last offset of the * records) + 1 - * @param records records to append + * + * @param records records to append * @param lastOffset expected next offset after append * @return the size of the appended records * @throws IOException @@ -150,7 +150,7 @@ public int append(MemoryRecords records, long lastOffset) throws IOException { int appendSize = records.sizeInBytes(); // Note that the calculation of count requires strong consistency between nextOffset and the baseOffset of records. int count = (int) (lastOffset - nextOffset.get()); - com.automq.elasticstream.client.DefaultRecordBatch batch = new com.automq.elasticstream.client.DefaultRecordBatch(count, 0, Collections.emptyMap(), records.buffer()); + kafka.log.es.DefaultRecordBatch batch = new kafka.log.es.DefaultRecordBatch(count, 0, Collections.emptyMap(), records.buffer()); CompletableFuture cf = streamSegment.append(batch); nextOffset.set(lastOffset); size.getAndAdd(appendSize); @@ -223,6 +223,7 @@ private Optional maybeLeaderEpoch(int leaderEpoch) { /** * Return the largest timestamp of the messages after a given offset + * * @param startOffset The starting offset. * @return The largest timestamp of the messages after the given position. */ @@ -240,7 +241,7 @@ public FileRecords.TimestampAndOffset largestTimestampAfter(long startOffset) { } } return new FileRecords.TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp, - maybeLeaderEpoch(leaderEpochOfMaxTimestamp)); + maybeLeaderEpoch(leaderEpochOfMaxTimestamp)); } public ElasticStreamSlice streamSegment() { diff --git a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala index 6083580545..b567a54dd8 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogManager.scala @@ -17,11 +17,11 @@ package kafka.log.es -import com.automq.elasticstream.client.api.Client -import kafka.log.{LogConfig, ProducerStateManagerConfig} import kafka.log.es.ElasticLogManager.NAMESPACE +import kafka.log.es.api.Client import kafka.log.es.client.{ClientFactoryProxy, Context} import kafka.log.s3.DefaultS3Client +import kafka.log.{LogConfig, ProducerStateManagerConfig} import kafka.server.{BrokerServer, KafkaConfig, LogDirFailureChannel} import kafka.utils.{Logging, Scheduler} import org.apache.kafka.common.TopicPartition @@ -31,20 +31,20 @@ import java.io.File import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala -class ElasticLogManager(val client: Client) extends Logging{ +class ElasticLogManager(val client: Client) extends Logging { this.logIdent = s"[ElasticLogManager] " private val elasticLogs = new ConcurrentHashMap[TopicPartition, ElasticLog]() def getOrCreateLog(dir: File, - config: LogConfig, - scheduler: Scheduler, - time: Time, - topicPartition: TopicPartition, - logDirFailureChannel: LogDirFailureChannel, - numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], - maxTransactionTimeoutMs: Int, - producerStateManagerConfig: ProducerStateManagerConfig, - leaderEpoch: Long): ElasticLog = { + config: LogConfig, + scheduler: Scheduler, + time: Time, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], + maxTransactionTimeoutMs: Int, + producerStateManagerConfig: ProducerStateManagerConfig, + leaderEpoch: Long): ElasticLog = { elasticLogs.computeIfAbsent(topicPartition, _ => { var elasticLog: ElasticLog = null ExceptionUtil.maybeRecordThrowableAndRethrow(new Runnable { @@ -59,8 +59,9 @@ class ElasticLogManager(val client: Client) extends Logging{ /** * Delete elastic log by topic partition. Note that this method may not be called by the broker holding the partition. + * * @param topicPartition topic partition - * @param epoch epoch of the partition + * @param epoch epoch of the partition */ def destroyLog(topicPartition: TopicPartition, epoch: Long): Unit = { // Removal may have happened in partition's closure. This is a defensive work. @@ -76,6 +77,7 @@ class ElasticLogManager(val client: Client) extends Logging{ /** * Remove elastic log in the map. + * * @param topicPartition topic partition */ def removeLog(topicPartition: TopicPartition): Unit = { @@ -114,6 +116,7 @@ object ElasticLogManager { var INSTANCE: Option[ElasticLogManager] = None var NAMESPACE = "" var DEFAULT_CLIENT: Option[DefaultS3Client] = None + def init(config: KafkaConfig, clusterId: String, broker: BrokerServer = null, appendWithAsyncCallbacks: Boolean = true): Boolean = { if (!config.elasticStreamEnabled) { return false @@ -136,7 +139,7 @@ object ElasticLogManager { context.config = config context.brokerServer = broker context.appendWithAsyncCallbacks = appendWithAsyncCallbacks - INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context))) + INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context))) val namespace = config.elasticStreamNamespace NAMESPACE = if (namespace == null || namespace.isEmpty) { @@ -163,15 +166,15 @@ object ElasticLogManager { // visible for testing def getOrCreateLog(dir: File, - config: LogConfig, - scheduler: Scheduler, - time: Time, - topicPartition: TopicPartition, - logDirFailureChannel: LogDirFailureChannel, - maxTransactionTimeoutMs: Int, - producerStateManagerConfig: ProducerStateManagerConfig, - numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], - leaderEpoch: Long): ElasticLog = { + config: LogConfig, + scheduler: Scheduler, + time: Time, + topicPartition: TopicPartition, + logDirFailureChannel: LogDirFailureChannel, + maxTransactionTimeoutMs: Int, + producerStateManagerConfig: ProducerStateManagerConfig, + numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int], + leaderEpoch: Long): ElasticLog = { INSTANCE.get.getOrCreateLog(dir, config, scheduler, time, topicPartition, logDirFailureChannel, numRemainingSegments, maxTransactionTimeoutMs, producerStateManagerConfig, leaderEpoch) } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogMeta.java b/core/src/main/scala/kafka/log/es/ElasticLogMeta.java index 2cf995e5aa..e0fbf232c6 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogMeta.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogMeta.java @@ -19,13 +19,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.slf4j.Logger; /** * logical meta data for a Kafka topicPartition. @@ -87,8 +88,8 @@ public String toString() { int size = segmentMetas.size(); List lastNthSegmentMetas = segmentMetas.subList(Math.max(0, size - 5), size); return "ElasticLogMeta{" + - "streamMap=" + streamMap + - ", lastNthSegmentMetas=" + lastNthSegmentMetas + - '}'; + "streamMap=" + streamMap + + ", lastNthSegmentMetas=" + lastNthSegmentMetas + + '}'; } } diff --git a/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala b/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala index fbf86af117..e08e474d22 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala +++ b/core/src/main/scala/kafka/log/es/ElasticLogSegment.scala @@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture import scala.jdk.CollectionConverters._ - class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, val _log: ElasticLogFileRecords, val timeIdx: ElasticTimeIndex, @@ -117,6 +116,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, def asyncLogFlush(): CompletableFuture[Void] = { _log.asyncFlush() } + def appendFromFile(records: FileRecords, start: Int): Int = { throw new UnsupportedOperationException() } @@ -250,6 +250,7 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta, /** * get appended offset. It can be used to show whether the segment contains any valid data. + * * @return appended offset */ def appendedOffset: Long = _log.appendedOffset() diff --git a/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java b/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java index d187f23b1d..cff30fcddb 100644 --- a/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java +++ b/core/src/main/scala/kafka/log/es/ElasticLogStreamManager.java @@ -17,8 +17,9 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; + import java.io.IOException; import java.util.Collections; import java.util.Map; diff --git a/core/src/main/scala/kafka/log/es/ElasticPartitionMeta.java b/core/src/main/scala/kafka/log/es/ElasticPartitionMeta.java index 361917b0d9..a46274f52a 100644 --- a/core/src/main/scala/kafka/log/es/ElasticPartitionMeta.java +++ b/core/src/main/scala/kafka/log/es/ElasticPartitionMeta.java @@ -47,7 +47,8 @@ public class ElasticPartitionMeta { private boolean cleanedShutdown; @SuppressWarnings("unused") // used by jackson - public ElasticPartitionMeta() {} + public ElasticPartitionMeta() { + } public ElasticPartitionMeta(Long startOffset, Long cleanerOffset, Long recoverOffset) { this.startOffset = startOffset; diff --git a/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala b/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala index 06aca2b7e3..f91d21255e 100644 --- a/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/es/ElasticProducerStateManager.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,133 +28,134 @@ import scala.collection.mutable /** * ElasticProducerStateManager. Temporarily, we only persist the last snapshot. + * * @param snapshotsMap All valid snapshots. */ class ElasticProducerStateManager( - override val topicPartition: TopicPartition, - var logDir: File, - override val maxTransactionTimeoutMs: Int, - override val producerStateManagerConfig: ProducerStateManagerConfig, - override val time: Time, - val snapshotsMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], - val persistFun: ElasticPartitionProducerSnapshotMeta => Unit -) extends ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) { - - this.logIdent = s"[ElasticProducerStateManager partition=$topicPartition] " - - override protected def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { - val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() - snapshotsMap.foreach { case (offset, meta) => - tm.put(offset, SnapshotFile(new File(meta.fileName()))) - } - tm - } - - override protected def loadFromSnapshot(logStartOffset: Long, currentTime: Long): Unit = { - while (true) { - latestSnapshotFile match { - case Some(snapshot) => - try { - info(s"Loading producer state from snapshot file '$snapshot'") - val loadedProducers = readSnapshot(snapshot.file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) } - loadedProducers.foreach(loadProducerEntry) - lastSnapOffset = snapshot.offset - lastMapOffset = lastSnapOffset - updateOldestTxnTimestamp() - return - } catch { - case e: CorruptSnapshotException => - warn(s"Failed to load producer snapshot from '${snapshot.file}': ${e.getMessage}") - removeAndDeleteSnapshot(snapshot.offset) - } - case None => - lastSnapOffset = logStartOffset - lastMapOffset = logStartOffset - return - } - } - } - - override def takeSnapshot(): Unit = { - // If not a new offset, then it is not worth taking another snapshot - if (lastMapOffset > lastSnapOffset) { - val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset)) - val start = time.hiResClockMs() - writeSnapshot(snapshotFile.offset, producers) - info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") - - snapshots.put(snapshotFile.offset, snapshotFile) - - // Update the last snap offset according to the serialized map - lastSnapOffset = lastMapOffset - } + override val topicPartition: TopicPartition, + var logDir: File, + override val maxTransactionTimeoutMs: Int, + override val producerStateManagerConfig: ProducerStateManagerConfig, + override val time: Time, + val snapshotsMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], + val persistFun: ElasticPartitionProducerSnapshotMeta => Unit + ) extends ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, producerStateManagerConfig, time) { + + this.logIdent = s"[ElasticProducerStateManager partition=$topicPartition] " + + override protected def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { + val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() + snapshotsMap.foreach { case (offset, meta) => + tm.put(offset, SnapshotFile(new File(meta.fileName()))) } - - private def writeSnapshot(offset: Long, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { - val buffer = ProducerStateManager.writeSnapshotToBuffer(entries) - val rawSnapshot: Array[Byte] = new Array[Byte](buffer.remaining()) - buffer.get(rawSnapshot) - - val meta = new ElasticPartitionProducerSnapshotMeta(offset, rawSnapshot) - snapshotsMap.put(offset, meta) - persistFun(meta) + tm + } + + override protected def loadFromSnapshot(logStartOffset: Long, currentTime: Long): Unit = { + while (true) { + latestSnapshotFile match { + case Some(snapshot) => + try { + info(s"Loading producer state from snapshot file '$snapshot'") + val loadedProducers = readSnapshot(snapshot.file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) } + loadedProducers.foreach(loadProducerEntry) + lastSnapOffset = snapshot.offset + lastMapOffset = lastSnapOffset + updateOldestTxnTimestamp() + return + } catch { + case e: CorruptSnapshotException => + warn(s"Failed to load producer snapshot from '${snapshot.file}': ${e.getMessage}") + removeAndDeleteSnapshot(snapshot.offset) + } + case None => + lastSnapOffset = logStartOffset + lastMapOffset = logStartOffset + return + } } + } - private def readSnapshot(file: File): Iterable[ProducerStateEntry] = { - val offset = LocalLog.offsetFromFile(file) - if (!snapshotsMap.contains(offset)) { - throw new CorruptSnapshotException(s"Snapshot not found") - } - - try { - ProducerStateManager.readSnapshotFromBuffer(snapshotsMap(offset).getRawSnapshotData) - } catch { - case e: SchemaException => - throw new CorruptSnapshotException(s"Snapshot failed schema validation: ${e.getMessage}") - } - } + override def takeSnapshot(): Unit = { + // If not a new offset, then it is not worth taking another snapshot + if (lastMapOffset > lastSnapOffset) { + val snapshotFile = SnapshotFile(UnifiedLog.producerSnapshotFile(_logDir, lastMapOffset)) + val start = time.hiResClockMs() + writeSnapshot(snapshotFile.offset, producers) + info(s"Wrote producer snapshot at offset $lastMapOffset with ${producers.size} producer ids in ${time.hiResClockMs() - start} ms.") - // do nothing - override def updateParentDir(parentDir: File): Unit = {} + snapshots.put(snapshotFile.offset, snapshotFile) - override protected def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { - deleteSnapshot(snapshotOffset) + // Update the last snap offset according to the serialized map + lastSnapOffset = lastMapOffset } - - override private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { - deleteSnapshot(snapshotOffset) - None + } + + private def writeSnapshot(offset: Long, entries: mutable.Map[Long, ProducerStateEntry]): Unit = { + val buffer = ProducerStateManager.writeSnapshotToBuffer(entries) + val rawSnapshot: Array[Byte] = new Array[Byte](buffer.remaining()) + buffer.get(rawSnapshot) + + val meta = new ElasticPartitionProducerSnapshotMeta(offset, rawSnapshot) + snapshotsMap.put(offset, meta) + persistFun(meta) + } + + private def readSnapshot(file: File): Iterable[ProducerStateEntry] = { + val offset = LocalLog.offsetFromFile(file) + if (!snapshotsMap.contains(offset)) { + throw new CorruptSnapshotException(s"Snapshot not found") } - private def deleteSnapshot(snapshotOffset: Long): Unit = { - snapshots.remove(snapshotOffset) - snapshotsMap.remove(snapshotOffset).foreach( snapshot => { - snapshot.setRawSnapshotData(null) - persistFun(snapshot) - info(s"Deleted producer snapshot file '$snapshotOffset' for partition $topicPartition") - }) + try { + ProducerStateManager.readSnapshotFromBuffer(snapshotsMap(offset).getRawSnapshotData) + } catch { + case e: SchemaException => + throw new CorruptSnapshotException(s"Snapshot failed schema validation: ${e.getMessage}") } + } + + // do nothing + override def updateParentDir(parentDir: File): Unit = {} + + override protected def removeAndDeleteSnapshot(snapshotOffset: Long): Unit = { + deleteSnapshot(snapshotOffset) + } + + override private[log] def removeAndMarkSnapshotForDeletion(snapshotOffset: Long): Option[SnapshotFile] = { + deleteSnapshot(snapshotOffset) + None + } + + private def deleteSnapshot(snapshotOffset: Long): Unit = { + snapshots.remove(snapshotOffset) + snapshotsMap.remove(snapshotOffset).foreach(snapshot => { + snapshot.setRawSnapshotData(null) + persistFun(snapshot) + info(s"Deleted producer snapshot file '$snapshotOffset' for partition $topicPartition") + }) + } } object ElasticProducerStateManager { - def apply( - topicPartition: TopicPartition, - logDir: File, - maxTransactionTimeoutMs: Int, - producerStateManagerConfig: ProducerStateManagerConfig, - time: Time, - snapshotMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], - persistFun: ElasticPartitionProducerSnapshotMeta => Unit - ): ElasticProducerStateManager = { - val stateManager = new ElasticProducerStateManager( - topicPartition, - logDir, - maxTransactionTimeoutMs, - producerStateManagerConfig, - time, - snapshotMap, - persistFun - ) - stateManager - } + def apply( + topicPartition: TopicPartition, + logDir: File, + maxTransactionTimeoutMs: Int, + producerStateManagerConfig: ProducerStateManagerConfig, + time: Time, + snapshotMap: mutable.Map[Long, ElasticPartitionProducerSnapshotMeta], + persistFun: ElasticPartitionProducerSnapshotMeta => Unit + ): ElasticProducerStateManager = { + val stateManager = new ElasticProducerStateManager( + topicPartition, + logDir, + maxTransactionTimeoutMs, + producerStateManagerConfig, + time, + snapshotMap, + persistFun + ) + stateManager + } } diff --git a/core/src/main/scala/kafka/log/es/ElasticRedisClient.java b/core/src/main/scala/kafka/log/es/ElasticRedisClient.java index 3791843941..2619d561a9 100644 --- a/core/src/main/scala/kafka/log/es/ElasticRedisClient.java +++ b/core/src/main/scala/kafka/log/es/ElasticRedisClient.java @@ -17,20 +17,20 @@ package kafka.log.es; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.Client; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.KVClient; +import kafka.log.es.api.KeyValue; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPooled; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.Client; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.KeyValue; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java index 661c9c96a9..cfdd43a2d8 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSlice.java @@ -17,14 +17,14 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.Stream; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.Stream; +import org.apache.kafka.common.errors.es.SlowFetchHintException; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import org.apache.kafka.common.errors.es.SlowFetchHintException; /** * Elastic stream slice is a slice from elastic stream, the startOffset of a slice is 0. @@ -68,6 +68,7 @@ default FetchResult fetch(long startOffset, long endOffset) throws SlowFetchHint /** * Get slice range which is the relative offset range in stream. + * * @return {@link SliceRange} */ SliceRange sliceRange(); diff --git a/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java b/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java index 5c3449d398..8fba262b46 100644 --- a/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java +++ b/core/src/main/scala/kafka/log/es/ElasticStreamSliceManager.java @@ -17,13 +17,13 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.Stream; +import kafka.log.es.api.Stream; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static com.automq.elasticstream.client.utils.Arguments.check; +import static kafka.log.es.utils.Arguments.check; /** * Elastic log dimension stream segment manager. diff --git a/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala index 9404c1a5e0..75d8710339 100644 --- a/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/es/ElasticUnifiedLog.scala @@ -41,9 +41,11 @@ class ElasticUnifiedLog(_logStartOffset: Long, var confirmOffsetChangeListener: Option[() => Unit] = None elasticLog.confirmOffsetChangeListener = Some(() => confirmOffsetChangeListener.map(_.apply())) + def confirmOffset(): LogOffsetMetadata = { elasticLog.confirmOffset.get() } + override private[log] def replaceSegments(newSegments: collection.Seq[LogSegment], oldSegments: collection.Seq[LogSegment]): Unit = { val deletedSegments = elasticLog.replaceSegments(newSegments, oldSegments) deleteProducerSnapshots(deletedSegments, asyncDelete = true) @@ -75,7 +77,7 @@ class ElasticUnifiedLog(_logStartOffset: Long, // only for testing private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment], - asyncDelete: Boolean): Unit = { + asyncDelete: Boolean): Unit = { elasticLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete, LogDeletion(elasticLog)) } @@ -169,8 +171,8 @@ object ElasticUnifiedLog extends Logging { * @return The new LeaderEpochFileCache instance (if created), none otherwise */ private[log] def maybeCreateLeaderEpochCache(topicPartition: TopicPartition, - recordVersion: RecordVersion, - leaderEpochCheckpoint: ElasticLeaderEpochCheckpoint): Option[LeaderEpochFileCache] = { + recordVersion: RecordVersion, + leaderEpochCheckpoint: ElasticLeaderEpochCheckpoint): Option[LeaderEpochFileCache] = { def newLeaderEpochFileCache(): LeaderEpochFileCache = new LeaderEpochFileCache(topicPartition, leaderEpochCheckpoint) diff --git a/core/src/main/scala/kafka/log/es/LazyStream.java b/core/src/main/scala/kafka/log/es/LazyStream.java index 4b7e98448c..12d538afe6 100644 --- a/core/src/main/scala/kafka/log/es/LazyStream.java +++ b/core/src/main/scala/kafka/log/es/LazyStream.java @@ -17,21 +17,21 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Lazy stream, create stream when append record. diff --git a/core/src/main/scala/kafka/log/es/MemoryClient.java b/core/src/main/scala/kafka/log/es/MemoryClient.java index 50c8272c3d..117a7fae05 100644 --- a/core/src/main/scala/kafka/log/es/MemoryClient.java +++ b/core/src/main/scala/kafka/log/es/MemoryClient.java @@ -17,17 +17,17 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.Client; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.KeyValue; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.Client; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.KVClient; +import kafka.log.es.api.KeyValue; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; import java.nio.ByteBuffer; import java.util.ArrayList; diff --git a/core/src/main/scala/kafka/log/es/MetaStream.java b/core/src/main/scala/kafka/log/es/MetaStream.java index feae520d00..a31eced797 100644 --- a/core/src/main/scala/kafka/log/es/MetaStream.java +++ b/core/src/main/scala/kafka/log/es/MetaStream.java @@ -17,11 +17,16 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; +import io.netty.buffer.Unpooled; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; @@ -36,11 +41,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import io.netty.buffer.Unpooled; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Meta stream is a wrapper of stream, it is used to record basic info of a topicPartition. * It serves as a kv stream. @@ -125,6 +125,7 @@ public AppendResult appendSync(MetaKeyValue kv) throws IOException { /** * Append a batch of meta key values without trims. + * * @return a future of append result */ private CompletableFuture append0(MetaKeyValue kv) { @@ -147,7 +148,7 @@ public CompletableFuture close() { trimFuture.cancel(true); } return innerStream.close() - .thenAccept(result -> fenced = true); + .thenAccept(result -> fenced = true); } public boolean isFenced() { @@ -164,6 +165,7 @@ public CompletableFuture destroy() { /** * Replay meta stream and return a map of meta keyValues. KeyValues will be cached in metaCache. + * * @return meta keyValues map */ public Map replay() throws IOException { diff --git a/core/src/main/scala/kafka/log/es/RawPayloadRecordBatch.java b/core/src/main/scala/kafka/log/es/RawPayloadRecordBatch.java index 1d30262b32..969163dcb7 100644 --- a/core/src/main/scala/kafka/log/es/RawPayloadRecordBatch.java +++ b/core/src/main/scala/kafka/log/es/RawPayloadRecordBatch.java @@ -17,7 +17,7 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.RecordBatch; +import kafka.log.es.api.RecordBatch; import java.nio.ByteBuffer; import java.util.Collections; diff --git a/core/src/main/scala/kafka/log/es/RecordBatchWithContextWrapper.java b/core/src/main/scala/kafka/log/es/RecordBatchWithContextWrapper.java index db1627889d..51ff883ac5 100644 --- a/core/src/main/scala/kafka/log/es/RecordBatchWithContextWrapper.java +++ b/core/src/main/scala/kafka/log/es/RecordBatchWithContextWrapper.java @@ -17,9 +17,8 @@ package kafka.log.es; -import com.automq.elasticstream.client.DefaultRecordBatch; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; import java.nio.ByteBuffer; import java.util.Collections; @@ -66,10 +65,10 @@ public ByteBuffer rawPayload() { public byte[] encode() { ByteBuffer buffer = ByteBuffer.allocate(8 + 4 + recordBatch.rawPayload().remaining()) - .putLong(baseOffset) - .putInt(recordBatch.count()) - .put(recordBatch.rawPayload().duplicate()) - .flip(); + .putLong(baseOffset) + .putInt(recordBatch.count()) + .put(recordBatch.rawPayload().duplicate()) + .flip(); return buffer.array(); } diff --git a/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java index a233d67deb..e79e68c0ae 100644 --- a/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java +++ b/core/src/main/scala/kafka/log/es/SeparateSlowAndQuickFetchHint.java @@ -18,6 +18,7 @@ package kafka.log.es; import io.netty.util.concurrent.FastThreadLocal; + import java.util.concurrent.atomic.AtomicBoolean; /** diff --git a/core/src/main/scala/kafka/log/es/SliceRange.java b/core/src/main/scala/kafka/log/es/SliceRange.java index d497d632eb..1d28e6e2e7 100644 --- a/core/src/main/scala/kafka/log/es/SliceRange.java +++ b/core/src/main/scala/kafka/log/es/SliceRange.java @@ -25,7 +25,8 @@ public class SliceRange { @JsonProperty("e") private long end = Offsets.NOOP_OFFSET; - public SliceRange() {} + public SliceRange() { + } public static SliceRange of(long start, long end) { SliceRange sliceRange = new SliceRange(); diff --git a/core/src/main/scala/kafka/log/es/StreamSliceSupplier.java b/core/src/main/scala/kafka/log/es/StreamSliceSupplier.java index f89ebd108d..27cd82ad0c 100644 --- a/core/src/main/scala/kafka/log/es/StreamSliceSupplier.java +++ b/core/src/main/scala/kafka/log/es/StreamSliceSupplier.java @@ -36,6 +36,7 @@ public ElasticStreamSlice get() throws IOException { /** * reset the slice to an open empty slice. This is used in segment index recovery. + * * @return a new open empty slice */ public ElasticStreamSlice reset() throws IOException { diff --git a/core/src/main/scala/kafka/log/es/api/AppendResult.java b/core/src/main/scala/kafka/log/es/api/AppendResult.java new file mode 100644 index 0000000000..92fd6ea783 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/AppendResult.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +/** + * Append RecordBatch to stream result. + */ +public interface AppendResult { + + /** + * Get record batch base offset. + * + * @return record batch base offset. + */ + long baseOffset(); + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/es/api/Client.java b/core/src/main/scala/kafka/log/es/api/Client.java new file mode 100644 index 0000000000..04fb6d5042 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/Client.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +/** + * Elastic Stream client. + */ +public interface Client { + /** + * Get stream client. + * + * @return {@link StreamClient} + */ + StreamClient streamClient(); + + /** + * Get KV client. + * + * @return {@link KVClient} + */ + KVClient kvClient(); +} diff --git a/core/src/main/scala/kafka/log/es/api/CreateStreamOptions.java b/core/src/main/scala/kafka/log/es/api/CreateStreamOptions.java new file mode 100644 index 0000000000..471b5904e2 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/CreateStreamOptions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import kafka.log.es.utils.Arguments; + +public class CreateStreamOptions { + private int replicaCount; + private long epoch; + + public static Builder newBuilder() { + return new Builder(); + } + + public int replicaCount() { + return replicaCount; + } + + public long epoch() { + return epoch; + } + + public static class Builder { + private final CreateStreamOptions options = new CreateStreamOptions(); + + public Builder replicaCount(int replicaCount) { + Arguments.check(replicaCount > 0, "replica count should larger than 0"); + options.replicaCount = replicaCount; + return this; + } + + public Builder epoch(long epoch) { + options.epoch = epoch; + return this; + } + + public CreateStreamOptions build() { + return options; + } + + } +} diff --git a/core/src/main/scala/kafka/log/es/api/ElasticStreamClientException.java b/core/src/main/scala/kafka/log/es/api/ElasticStreamClientException.java new file mode 100644 index 0000000000..c018f5608e --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/ElasticStreamClientException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.util.concurrent.ExecutionException; + +/** + * All stream client exceptions will list extends ElasticStreamClientException and list here. + */ +public class ElasticStreamClientException extends ExecutionException { + private final int code; + + public ElasticStreamClientException(int code, String str) { + super("code: " + code + ", " + str); + this.code = code; + } + + public int getCode() { + return this.code; + } +} diff --git a/core/src/main/scala/kafka/log/es/api/ErrorCode.java b/core/src/main/scala/kafka/log/es/api/ErrorCode.java new file mode 100644 index 0000000000..882f9b0ce5 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/ErrorCode.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +public class ErrorCode { + public static final short OFFSET_OUT_OF_RANGE_BOUNDS = 1463; + public static final short STREAM_ALREADY_CLOSED = 1478; + public static final short EXPIRED_STREAM_EPOCH = 1489; +} diff --git a/core/src/main/scala/kafka/log/es/api/FetchResult.java b/core/src/main/scala/kafka/log/es/api/FetchResult.java new file mode 100644 index 0000000000..eaeddecc5f --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/FetchResult.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.util.List; + +public interface FetchResult { + + /** + * Get fetched RecordBatch list. + * + * @return {@link RecordBatchWithContext} list. + */ + List recordBatchList(); + + /** + * Free fetch result backend memory. + */ + default void free() { + } +} diff --git a/core/src/main/scala/kafka/log/es/api/KVClient.java b/core/src/main/scala/kafka/log/es/api/KVClient.java new file mode 100644 index 0000000000..a597704c9d --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/KVClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Light KV client, support light & simple kv operations. + */ +public interface KVClient { + /** + * Put key value. + * + * @param keyValues {@link KeyValue} list. + * @return async put result. + */ + CompletableFuture putKV(List keyValues); + + /** + * Get value by key. + * + * @param keys key list. + * @return {@link KeyValue} list. + */ + CompletableFuture> getKV(List keys); + + /** + * Delete key value by key. + * + * @param keys key list. + * @return async delete result. + */ + CompletableFuture delKV(List keys); +} diff --git a/core/src/main/scala/kafka/log/es/api/KeyValue.java b/core/src/main/scala/kafka/log/es/api/KeyValue.java new file mode 100644 index 0000000000..62ad498774 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/KeyValue.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + + +import java.nio.ByteBuffer; +import java.util.Objects; + +public class KeyValue { + private final String key; + private final ByteBuffer value; + + private KeyValue(String key, ByteBuffer value) { + this.key = key; + this.value = value; + } + + public static KeyValue of(String key, ByteBuffer value) { + return new KeyValue(key, value); + } + + public String key() { + return key; + } + + public ByteBuffer value() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KeyValue keyValue = (KeyValue) o; + return Objects.equals(key, keyValue.key) && Objects.equals(value, keyValue.value); + } + + @Override + public int hashCode() { + return Objects.hash(key, value); + } +} diff --git a/core/src/main/scala/kafka/log/es/api/OpenStreamOptions.java b/core/src/main/scala/kafka/log/es/api/OpenStreamOptions.java new file mode 100644 index 0000000000..22d09e38e0 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/OpenStreamOptions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import kafka.log.es.utils.Arguments; + +public class OpenStreamOptions { + private WriteMode writeMode = WriteMode.SINGLE; + private ReadMode readMode = ReadMode.MULTIPLE; + private long epoch; + + public static Builder newBuilder() { + return new Builder(); + } + + public WriteMode writeMode() { + return writeMode; + } + + public ReadMode readMode() { + return readMode; + } + + public long epoch() { + return epoch; + } + + public enum WriteMode { + SINGLE(0), MULTIPLE(1); + + final int code; + + WriteMode(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + } + + public enum ReadMode { + SINGLE(0), MULTIPLE(1); + + final int code; + + ReadMode(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + } + + public static class Builder { + private final OpenStreamOptions options = new OpenStreamOptions(); + + public Builder writeMode(WriteMode writeMode) { + Arguments.isNotNull(writeMode, "WriteMode should be set with SINGLE or MULTIPLE"); + options.writeMode = writeMode; + return this; + } + + public Builder readMode(ReadMode readMode) { + Arguments.isNotNull(readMode, "ReadMode should be set with SINGLE or MULTIPLE"); + options.readMode = readMode; + return this; + } + + public Builder epoch(long epoch) { + options.epoch = epoch; + return this; + } + + public OpenStreamOptions build() { + return options; + } + } +} diff --git a/core/src/main/scala/kafka/log/es/api/RecordBatch.java b/core/src/main/scala/kafka/log/es/api/RecordBatch.java new file mode 100644 index 0000000000..2c4019a385 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/RecordBatch.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * Record batch. + */ +public interface RecordBatch { + + /** + * Get payload record count. + * + * @return record count. + */ + int count(); + + /** + * Get min timestamp of records. + * + * @return min timestamp of records. + */ + long baseTimestamp(); + + /** + * Get record batch extension properties. + * + * @return batch extension properties. + */ + Map properties(); + + /** + * Get raw payload. + * + * @return raw payload. + */ + ByteBuffer rawPayload(); +} diff --git a/core/src/main/scala/kafka/log/es/api/RecordBatchWithContext.java b/core/src/main/scala/kafka/log/es/api/RecordBatchWithContext.java new file mode 100644 index 0000000000..9735b0664a --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/RecordBatchWithContext.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +public interface RecordBatchWithContext extends RecordBatch { + + /** + * Get record batch base offset. + * + * @return base offset. + */ + long baseOffset(); + + /** + * Get record batch exclusive last offset. + * + * @return exclusive last offset. + */ + long lastOffset(); +} diff --git a/core/src/main/scala/kafka/log/es/api/Stream.java b/core/src/main/scala/kafka/log/es/api/Stream.java new file mode 100644 index 0000000000..bc1b5c59e1 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/Stream.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.util.concurrent.CompletableFuture; + +/** + * Record stream. + */ +public interface Stream { + + /** + * Get stream id + */ + long streamId(); + + /** + * Get stream start offset. + */ + long startOffset(); + + /** + * Get stream next append record offset. + */ + long nextOffset(); + + + /** + * Append recordBatch to stream. + * + * @param recordBatch {@link RecordBatch}. + * @return - complete success with async {@link AppendResult}, when append success. + * - complete exception with {@link ElasticStreamClientException}, when append fail. TODO: specify the exception. + */ + CompletableFuture append(RecordBatch recordBatch); + + /** + * Fetch recordBatch list from stream. Note the startOffset may be in the middle in the first recordBatch. + * + * @param startOffset start offset, if the startOffset in middle of a recordBatch, the recordBatch will be returned. + * @param endOffset exclusive end offset, if the endOffset in middle of a recordBatch, the recordBatch will be returned. + * @param maxBytesHint max fetch data size hint, the real return data size may be larger than maxBytesHint. + * @return - complete success with {@link FetchResult}, when fetch success. + * - complete exception with {@link ElasticStreamClientException}, when startOffset is bigger than stream end offset. + */ + CompletableFuture fetch(long startOffset, long endOffset, int maxBytesHint); + + /** + * Trim stream. + * + * @param newStartOffset new start offset. + * @return - complete success with async {@link Void}, when trim success. + * - complete exception with {@link ElasticStreamClientException}, when trim fail. + */ + CompletableFuture trim(long newStartOffset); + + /** + * Close the stream. + */ + CompletableFuture close(); + + /** + * Destroy stream. + */ + CompletableFuture destroy(); +} diff --git a/core/src/main/scala/kafka/log/es/api/StreamClient.java b/core/src/main/scala/kafka/log/es/api/StreamClient.java new file mode 100644 index 0000000000..e7c94b34d5 --- /dev/null +++ b/core/src/main/scala/kafka/log/es/api/StreamClient.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.api; + +import java.util.concurrent.CompletableFuture; + +/** + * Stream client, support stream create and open operation. + */ +public interface StreamClient { + /** + * Create and open stream. + * + * @param options create stream options. + * @return {@link Stream}. + */ + CompletableFuture createAndOpenStream(CreateStreamOptions options); + + /** + * Open stream. + * + * @param streamId stream id. + * @param options open stream options. + * @return {@link Stream}. + */ + CompletableFuture openStream(long streamId, OpenStreamOptions options); +} diff --git a/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java b/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java index 0da3758894..7b9b07350c 100644 --- a/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java +++ b/core/src/main/scala/kafka/log/es/client/ClientFactoryProxy.java @@ -17,18 +17,18 @@ package kafka.log.es.client; -import com.automq.elasticstream.client.api.Client; +import kafka.log.es.api.Client; import java.lang.reflect.Method; public class ClientFactoryProxy { - private static final String PROTOCOL_SEPERATOR = ":"; + private static final String PROTOCOL_SEPARATOR = ":"; private static final String FACTORY_CLASS_FORMAT = "kafka.log.es.client.%s.ClientFactory"; public static Client get(Context context) { String endpoint = context.config.elasticStreamEndpoint(); - String protocal = endpoint.split(PROTOCOL_SEPERATOR)[0]; - String className = String.format(FACTORY_CLASS_FORMAT, protocal); + String protocol = endpoint.split(PROTOCOL_SEPARATOR)[0]; + String className = String.format(FACTORY_CLASS_FORMAT, protocol); try { Class clazz = Class.forName(className); Method method = clazz.getDeclaredMethod("get", Context.class); diff --git a/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java deleted file mode 100644 index c9e242f7b0..0000000000 --- a/core/src/main/scala/kafka/log/es/client/es/ClientFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.es.client.es; - -import com.automq.elasticstream.client.DefaultClientBuilder; -import com.automq.elasticstream.client.api.Client; -import kafka.log.es.AlwaysSuccessClient; -import kafka.log.es.client.Context; - -public class ClientFactory { - private static final String ES_ENDPOINT_PREFIX = "es://"; - - public static Client get(Context context) { - String endpoint = context.config.elasticStreamEndpoint(); - String kvEndpoint = context.config.elasticStreamKvEndpoint(); - if (!kvEndpoint.startsWith(ES_ENDPOINT_PREFIX)) { - throw new IllegalArgumentException("Elastic stream endpoint and kvEndpoint must be the same protocol: " + endpoint + " " + kvEndpoint); - } - Client client = new DefaultClientBuilder() - .endpoint(endpoint.substring(ES_ENDPOINT_PREFIX.length())) - .kvEndpoint(kvEndpoint.substring(ES_ENDPOINT_PREFIX.length())) - .build(); - return new AlwaysSuccessClient(client); - } - -} diff --git a/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java index 91576ead12..d386890d86 100644 --- a/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java +++ b/core/src/main/scala/kafka/log/es/client/memory/ClientFactory.java @@ -17,10 +17,10 @@ package kafka.log.es.client.memory; -import com.automq.elasticstream.client.api.Client; -import kafka.log.es.client.Context; import kafka.log.es.AlwaysSuccessClient; import kafka.log.es.MemoryClient; +import kafka.log.es.api.Client; +import kafka.log.es.client.Context; public class ClientFactory { diff --git a/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java index fe9c03a886..23ce682232 100644 --- a/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java +++ b/core/src/main/scala/kafka/log/es/client/redis/ClientFactory.java @@ -17,8 +17,8 @@ package kafka.log.es.client.redis; -import com.automq.elasticstream.client.api.Client; import kafka.log.es.ElasticRedisClient; +import kafka.log.es.api.Client; import kafka.log.es.client.Context; public class ClientFactory { diff --git a/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java index 33a75f280b..d19c0a6ee8 100644 --- a/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java +++ b/core/src/main/scala/kafka/log/es/client/s3/ClientFactory.java @@ -17,8 +17,8 @@ package kafka.log.es.client.s3; -import com.automq.elasticstream.client.api.Client; import kafka.log.es.AlwaysSuccessClient; +import kafka.log.es.api.Client; import kafka.log.es.client.Context; import kafka.log.s3.DefaultS3Client; import kafka.log.s3.operator.DefaultS3Operator; diff --git a/core/src/main/scala/kafka/log/es/utils/Arguments.java b/core/src/main/scala/kafka/log/es/utils/Arguments.java new file mode 100644 index 0000000000..0eb78eaa5a --- /dev/null +++ b/core/src/main/scala/kafka/log/es/utils/Arguments.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log.es.utils; + +public class Arguments { + + public static void check(boolean checkResult, String errorMessage) { + if (!checkResult) { + throw new IllegalArgumentException(errorMessage); + } + } + + public static void isNotNull(Object obj, String errorMessage) { + if (obj == null) { + throw new IllegalArgumentException(errorMessage); + } + } + +} diff --git a/core/src/main/scala/kafka/log/s3/ControllerKVClient.java b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java index 7270ac0f8b..46d74e7802 100644 --- a/core/src/main/scala/kafka/log/s3/ControllerKVClient.java +++ b/core/src/main/scala/kafka/log/s3/ControllerKVClient.java @@ -17,13 +17,8 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.KeyValue; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import kafka.log.es.api.KVClient; +import kafka.log.es.api.KeyValue; import kafka.log.s3.network.ControllerRequestSender; import org.apache.kafka.common.message.DeleteKVRequestData; import org.apache.kafka.common.message.GetKVRequestData; @@ -38,6 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + public class ControllerKVClient implements KVClient { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerKVClient.class); @@ -51,69 +51,69 @@ public ControllerKVClient(ControllerRequestSender requestSender) { public CompletableFuture putKV(List list) { LOGGER.trace("[ControllerKVClient]: Put KV: {}", list); PutKVRequest.Builder requestBuilder = new Builder( - new PutKVRequestData() - .setKeyValues(list.stream().map(kv -> new PutKVRequestData.KeyValue() - .setKey(kv.key()) - .setValue(kv.value().array()) - ).collect(Collectors.toList())) + new PutKVRequestData() + .setKeyValues(list.stream().map(kv -> new PutKVRequestData.KeyValue() + .setKey(kv.key()) + .setValue(kv.value().array()) + ).collect(Collectors.toList())) ); return this.requestSender.send(requestBuilder, PutKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp); - return null; - default: - LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code); - throw code.exception(); - } - }); + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Put KV: {}, result: {}", list, resp); + return null; + default: + LOGGER.error("[ControllerKVClient]: Failed to Put KV: {}, code: {}", list, code); + throw code.exception(); + } + }); } @Override public CompletableFuture> getKV(List list) { LOGGER.trace("[ControllerKVClient]: Get KV: {}", list); GetKVRequest.Builder requestBuilder = new GetKVRequest.Builder( - new GetKVRequestData() - .setKeys(list) + new GetKVRequestData() + .setKeys(list) ); return this.requestSender.send(requestBuilder, GetKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - List keyValues = resp.keyValues() - .stream() - .map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null)) - .collect(Collectors.toList()); - LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues); - return keyValues; - default: - LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code); - throw code.exception(); - } - }); + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + List keyValues = resp.keyValues() + .stream() + .map(kv -> KeyValue.of(kv.key(), kv.value() != null ? ByteBuffer.wrap(kv.value()) : null)) + .collect(Collectors.toList()); + LOGGER.trace("[ControllerKVClient]: Get KV: {}, result: {}", list, keyValues); + return keyValues; + default: + LOGGER.error("[ControllerKVClient]: Failed to Get KV: {}, code: {}", String.join(",", list), code); + throw code.exception(); + } + }); } @Override public CompletableFuture delKV(List list) { LOGGER.trace("[ControllerKVClient]: Delete KV: {}", String.join(",", list)); DeleteKVRequest.Builder requestBuilder = new DeleteKVRequest.Builder( - new DeleteKVRequestData() - .setKeys(list) + new DeleteKVRequestData() + .setKeys(list) ); return this.requestSender.send(requestBuilder, PutKVResponseData.class) - .thenApply(resp -> { - Errors code = Errors.forCode(resp.errorCode()); - switch (code) { - case NONE: - LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp); - return null; - default: - LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code); - throw code.exception(); - } - }); + .thenApply(resp -> { + Errors code = Errors.forCode(resp.errorCode()); + switch (code) { + case NONE: + LOGGER.trace("[ControllerKVClient]: Delete KV: {}, result: {}", list, resp); + return null; + default: + LOGGER.error("[ControllerKVClient]: Failed to Delete KV: {}, code: {}", String.join(",", list), code); + throw code.exception(); + } + }); } } diff --git a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java index 97bcc6c3fe..6978a2a488 100644 --- a/core/src/main/scala/kafka/log/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/s3/DefaultS3Client.java @@ -17,9 +17,9 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.Client; -import com.automq.elasticstream.client.api.KVClient; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.Client; +import kafka.log.es.api.KVClient; +import kafka.log.es.api.StreamClient; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.metadata.StreamMetadataManager; diff --git a/core/src/main/scala/kafka/log/s3/S3Stream.java b/core/src/main/scala/kafka/log/s3/S3Stream.java index 5f0ddd623f..8b4c731bdc 100644 --- a/core/src/main/scala/kafka/log/s3/S3Stream.java +++ b/core/src/main/scala/kafka/log/s3/S3Stream.java @@ -17,17 +17,17 @@ package kafka.log.s3; -import com.automq.elasticstream.client.DefaultAppendResult; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.ElasticStreamClientException; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.flatc.header.ErrorCode; import io.netty.buffer.Unpooled; +import kafka.log.es.DefaultAppendResult; import kafka.log.es.FutureUtil; import kafka.log.es.RecordBatchWithContextWrapper; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.ElasticStreamClientException; +import kafka.log.es.api.ErrorCode; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; import org.slf4j.Logger; diff --git a/core/src/main/scala/kafka/log/s3/S3StreamClient.java b/core/src/main/scala/kafka/log/s3/S3StreamClient.java index 768ba06a52..b35eb0731e 100644 --- a/core/src/main/scala/kafka/log/s3/S3StreamClient.java +++ b/core/src/main/scala/kafka/log/s3/S3StreamClient.java @@ -17,10 +17,10 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; import kafka.log.s3.streams.StreamManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java b/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java index e4a423ecc4..262274c961 100644 --- a/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java +++ b/core/src/main/scala/kafka/log/s3/StreamObjectCopyer.java @@ -20,15 +20,16 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CompletableFuture; import kafka.log.s3.operator.S3Operator; import kafka.log.s3.operator.Writer; import org.apache.kafka.metadata.stream.ObjectUtils; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectType; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + public class StreamObjectCopyer { private final List completedObjects; private final S3Operator s3Operator; diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index 8faeca87d7..79459d58d1 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -17,43 +17,43 @@ package kafka.log.s3.memory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; -import java.util.stream.Collectors; - import kafka.log.s3.objects.CommitStreamObjectRequest; import kafka.log.s3.objects.CommitWALObjectRequest; import kafka.log.s3.objects.CommitWALObjectResponse; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.objects.OpenStreamMetadata; -import org.apache.kafka.common.errors.s3.StreamNotClosedException; -import org.apache.kafka.metadata.stream.S3StreamConstant; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; import kafka.log.s3.streams.StreamManager; -import org.apache.kafka.metadata.stream.ObjectUtils; import org.apache.kafka.common.errors.s3.StreamFencedException; +import org.apache.kafka.common.errors.s3.StreamNotClosedException; import org.apache.kafka.common.errors.s3.StreamNotExistException; +import org.apache.kafka.metadata.stream.ObjectUtils; import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.StreamOffsetRange; +import org.apache.kafka.metadata.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.StreamOffsetRange; import org.apache.kafka.metadata.stream.StreamState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + public class MemoryMetadataManager implements StreamManager, ObjectManager { private static final int MOCK_BROKER_ID = 0; diff --git a/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java index 5c883a2b22..ff16a95734 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java +++ b/core/src/main/scala/kafka/log/s3/metadata/InRangeObjectsFetcher.java @@ -17,18 +17,19 @@ package kafka.log.s3.metadata; -import java.util.concurrent.CompletableFuture; import org.apache.kafka.metadata.stream.InRangeObjects; +import java.util.concurrent.CompletableFuture; + public interface InRangeObjectsFetcher { /** * fetch stream interval related objects * - * @param streamId stream id + * @param streamId stream id * @param startOffset start offset, inclusive, if not exist, return INVALID - * @param endOffset end offset, exclusive, if not exist, wait for it - * @param limit max object count + * @param endOffset end offset, exclusive, if not exist, wait for it + * @param limit max object count * @return {@link InRangeObjects} */ CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit); diff --git a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java index e9f21e1e41..9f305f408f 100644 --- a/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/metadata/StreamMetadataManager.java @@ -18,19 +18,6 @@ package kafka.log.s3.metadata; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.stream.Collectors; import kafka.log.es.FutureUtil; import kafka.server.BrokerServer; import kafka.server.KafkaConfig; @@ -49,6 +36,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + public class StreamMetadataManager implements InRangeObjectsFetcher { // TODO: optimize by more suitable concurrent protection @@ -101,10 +102,10 @@ private List removePendingTasks() { } Set pendingStreams = pendingGetObjectsTasks.keySet(); List pendingStreamsOffsetRange = pendingStreams - .stream() - .map(streamsImage::offsetRange) - .filter(offset -> offset != StreamOffsetRange.INVALID) - .collect(Collectors.toList()); + .stream() + .map(streamsImage::offsetRange) + .filter(offset -> offset != StreamOffsetRange.INVALID) + .collect(Collectors.toList()); if (pendingStreamsOffsetRange.isEmpty()) { return Collections.emptyList(); } @@ -117,7 +118,7 @@ private List removePendingTasks() { return; } Iterator>> iterator = - tasks.entrySet().iterator(); + tasks.entrySet().iterator(); while (iterator.hasNext()) { Entry> entry = iterator.next(); long pendingEndOffset = entry.getKey(); @@ -141,8 +142,8 @@ public CompletableFuture fetch(long streamId, long startOffset, S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId); if (streamImage == null) { LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null", - streamId, startOffset, endOffset, limit); + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null", + streamId, startOffset, endOffset, limit); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } StreamOffsetRange offsetRange = streamImage.offsetRange(); @@ -153,8 +154,8 @@ public CompletableFuture fetch(long streamId, long startOffset, long streamEndOffset = offsetRange.getEndOffset(); if (startOffset < streamStartOffset) { LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", - streamId, startOffset, endOffset, limit, streamStartOffset); + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", + streamId, startOffset, endOffset, limit, streamStartOffset); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } if (endOffset > streamEndOffset) { @@ -176,8 +177,8 @@ public CompletableFuture> getStreamObjects(long str return CompletableFuture.completedFuture(s3StreamObjectMetadataList); } catch (Exception e) { LOGGER.warn( - "[GetStreamObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with exception: {}", - streamId, startOffset, endOffset, limit, e.getMessage()); + "[GetStreamObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with exception: {}", + streamId, startOffset, endOffset, limit, e.getMessage()); return CompletableFuture.failedFuture(e); } } @@ -187,11 +188,11 @@ public CompletableFuture> getStreamObjects(long str private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, - k -> new TreeMap<>()); + k -> new TreeMap<>()); List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); getObjectsTasks.add(task); LOGGER.warn("[PendingFetch]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and pending fetch", streamId, startOffset, endOffset, - limit); + limit); return task.cf; } @@ -200,8 +201,8 @@ private CompletableFuture fetch0(long streamId, long startOffset InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit); if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } // fill the objects' size @@ -210,15 +211,15 @@ private CompletableFuture fetch0(long streamId, long startOffset if (objectMetadata == null) { // should not happen LOGGER.error( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); return CompletableFuture.completedFuture(InRangeObjects.INVALID); } object.setObjectSize(objectMetadata.getObjectSize()); } LOGGER.trace( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", - streamId, startOffset, endOffset, limit, cachedInRangeObjects); + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", + streamId, startOffset, endOffset, limit, cachedInRangeObjects); return CompletableFuture.completedFuture(cachedInRangeObjects); } diff --git a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java index c599f75f3c..d208a063aa 100644 --- a/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamRecordBatch.java @@ -17,8 +17,8 @@ package kafka.log.s3.model; -import com.automq.elasticstream.client.api.RecordBatch; import io.netty.buffer.ByteBuf; +import kafka.log.es.api.RecordBatch; import kafka.log.s3.StreamRecordBatchCodec; import java.nio.ByteBuffer; diff --git a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java index 6e6f45cecd..643f685a90 100644 --- a/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java +++ b/core/src/main/scala/kafka/log/s3/network/ControllerRequestSender.java @@ -17,7 +17,6 @@ package kafka.log.s3.network; -import java.util.concurrent.CompletableFuture; import kafka.server.BrokerServer; import kafka.server.BrokerToControllerChannelManager; import kafka.server.ControllerRequestCompletionHandler; @@ -28,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; + public class ControllerRequestSender { private static final Logger LOGGER = LoggerFactory.getLogger(ControllerRequestSender.class); @@ -40,7 +41,7 @@ public ControllerRequestSender(BrokerServer brokerServer) { } public CompletableFuture send(AbstractRequest.Builder requestBuilder, - Class responseDataType) { + Class responseDataType) { CompletableFuture cf = new CompletableFuture<>(); LOGGER.debug("Sending request {}", requestBuilder); channelManager.sendRequest(requestBuilder, new ControllerRequestCompletionHandler() { @@ -65,7 +66,7 @@ public void onComplete(ClientResponse response) { } if (!responseDataType.isInstance(response.responseBody().data())) { LOGGER.error("Unexpected response type: {} while sending request: {}", - response.responseBody().data().getClass().getSimpleName(), requestBuilder); + response.responseBody().data().getClass().getSimpleName(), requestBuilder); cf.completeExceptionally(new RuntimeException("Unexpected response type while sending request")); } cf.complete((R) response.responseBody().data()); diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java index 0b97e47824..46445119e2 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWALObjectRequest.java @@ -132,12 +132,12 @@ public void setOrderId(long orderId) { @Override public String toString() { return "CommitWALObjectRequest{" + - "objectId=" + objectId + - ", orderId=" + orderId + - ", objectSize=" + objectSize + - ", streamRanges=" + streamRanges + - ", streamObjects=" + streamObjects + - ", compactedObjectIds=" + compactedObjectIds + - '}'; + "objectId=" + objectId + + ", orderId=" + orderId + + ", objectSize=" + objectSize + + ", streamRanges=" + streamRanges + + ", streamObjects=" + streamObjects + + ", compactedObjectIds=" + compactedObjectIds + + '}'; } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 0c61b39aee..2d445f8358 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -18,10 +18,6 @@ package kafka.log.s3.objects; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.network.ControllerRequestSender; import kafka.server.KafkaConfig; @@ -37,6 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + public class ControllerObjectManager implements ObjectManager { private final static Logger LOGGER = LoggerFactory.getLogger(ControllerObjectManager.class); @@ -54,10 +55,10 @@ public ControllerObjectManager(ControllerRequestSender requestSender, StreamMeta @Override public CompletableFuture prepareObject(int count, long ttl) { PrepareS3ObjectRequest.Builder request = new Builder( - new PrepareS3ObjectRequestData() - .setBrokerId(config.brokerId()) - .setPreparedCount(count) - .setTimeToLiveInMs(ttl) + new PrepareS3ObjectRequestData() + .setBrokerId(config.brokerId()) + .setPreparedCount(count) + .setTimeToLiveInMs(ttl) ); return requestSender.send(request, PrepareS3ObjectResponseData.class).thenApply(resp -> { Errors code = Errors.forCode(resp.errorCode()); @@ -74,18 +75,18 @@ public CompletableFuture prepareObject(int count, long ttl) { @Override public CompletableFuture commitWALObject(CommitWALObjectRequest request) { org.apache.kafka.common.requests.s3.CommitWALObjectRequest.Builder wrapRequestBuilder = new org.apache.kafka.common.requests.s3.CommitWALObjectRequest.Builder( - new CommitWALObjectRequestData() - .setBrokerId(config.brokerId()) - .setOrderId(request.getOrderId()) - .setObjectId(request.getObjectId()) - .setObjectSize(request.getObjectSize()) - .setObjectStreamRanges(request.getStreamRanges() - .stream() - .map(ObjectStreamRange::toObjectStreamRangeInRequest).collect(Collectors.toList())) - .setStreamObjects(request.getStreamObjects() - .stream() - .map(StreamObject::toStreamObjectInRequest).collect(Collectors.toList())) - .setCompactedObjectIds(request.getCompactedObjectIds())); + new CommitWALObjectRequestData() + .setBrokerId(config.brokerId()) + .setOrderId(request.getOrderId()) + .setObjectId(request.getObjectId()) + .setObjectSize(request.getObjectSize()) + .setObjectStreamRanges(request.getStreamRanges() + .stream() + .map(ObjectStreamRange::toObjectStreamRangeInRequest).collect(Collectors.toList())) + .setStreamObjects(request.getStreamObjects() + .stream() + .map(StreamObject::toStreamObjectInRequest).collect(Collectors.toList())) + .setCompactedObjectIds(request.getCompactedObjectIds())); return requestSender.send(wrapRequestBuilder, CommitWALObjectResponseData.class).thenApply(resp -> { Errors code = Errors.forCode(resp.errorCode()); switch (code) { @@ -115,7 +116,7 @@ public List getObjects(long streamId, long startOffset, long e }).get(); } catch (Exception e) { LOGGER.error("Error while get objects, streamId: {}, startOffset: {}, endOffset: {}, limit: {}", streamId, startOffset, endOffset, limit, - e); + e); return Collections.emptyList(); } } diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java index 4ff1a8ce7a..fc01549ac1 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectManager.java @@ -17,9 +17,10 @@ package kafka.log.s3.objects; +import org.apache.kafka.metadata.stream.S3ObjectMetadata; + import java.util.List; import java.util.concurrent.CompletableFuture; -import org.apache.kafka.metadata.stream.S3ObjectMetadata; /** * Object metadata registry. @@ -55,10 +56,10 @@ public interface ObjectManager { * When obj1 contains stream0 [0, 100) [200, 300) and obj2 contains stream1 [100, 200), * expect getObjects(streamId, 0, 300) return [obj1, obj2, obj1] * - * @param streamId stream id. + * @param streamId stream id. * @param startOffset get range start offset. - * @param endOffset get range end offset. - * @param limit max object count. Why use limit instead of maxBytes? Because we cannot get stream size from object metadata. + * @param endOffset get range end offset. + * @param limit max object count. Why use limit instead of maxBytes? Because we cannot get stream size from object metadata. * @return {@link S3ObjectMetadata} */ List getObjects(long streamId, long startOffset, long endOffset, int limit); diff --git a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java index 79f01813b1..43777ab71f 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java +++ b/core/src/main/scala/kafka/log/s3/objects/ObjectStreamRange.java @@ -25,7 +25,8 @@ public class ObjectStreamRange { private long startOffset; private long endOffset; - public ObjectStreamRange() {} + public ObjectStreamRange() { + } public ObjectStreamRange(long streamId, long epoch, long startOffset, long endOffset) { this.streamId = streamId; @@ -41,6 +42,7 @@ public long getStreamId() { public long getEpoch() { return epoch; } + public long getStartOffset() { return startOffset; } @@ -67,9 +69,9 @@ public void setEndOffset(long endOffset) { public CommitWALObjectRequestData.ObjectStreamRange toObjectStreamRangeInRequest() { return new CommitWALObjectRequestData.ObjectStreamRange() - .setStreamId(streamId) - .setStartOffset(startOffset) - .setEndOffset(endOffset); + .setStreamId(streamId) + .setStartOffset(startOffset) + .setEndOffset(endOffset); } @Override diff --git a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java index 79966d12f8..741bc3ada6 100644 --- a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java +++ b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java @@ -17,9 +17,10 @@ package kafka.log.s3.objects; -import java.util.List; import org.apache.kafka.common.message.CommitWALObjectRequestData; +import java.util.List; + public class StreamObject { private long objectId; private long objectSize; @@ -82,11 +83,11 @@ public void setSourceObjectIds(List sourceObjectIds) { public CommitWALObjectRequestData.StreamObject toStreamObjectInRequest() { return new CommitWALObjectRequestData.StreamObject() - .setStreamId(streamId) - .setObjectId(objectId) - .setObjectSize(objectSize) - .setStartOffset(startOffset) - .setEndOffset(endOffset); + .setStreamId(streamId) + .setObjectId(objectId) + .setObjectSize(objectSize) + .setStartOffset(startOffset) + .setEndOffset(endOffset); } @Override diff --git a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java index 42ed424857..7cc2276bb1 100644 --- a/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java +++ b/core/src/main/scala/kafka/log/s3/operator/MemoryS3Operator.java @@ -21,7 +21,6 @@ import io.netty.buffer.Unpooled; import kafka.log.es.FutureUtil; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 51e3c39de8..8387a1dcbc 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -17,11 +17,12 @@ package kafka.log.s3.streams; -import java.util.List; -import java.util.concurrent.CompletableFuture; import kafka.log.s3.objects.OpenStreamMetadata; import org.apache.kafka.metadata.stream.StreamOffsetRange; +import java.util.List; +import java.util.concurrent.CompletableFuture; + public interface StreamManager { /** @@ -55,8 +56,8 @@ public interface StreamManager { /** * Close stream. Other server can open stream with newer epoch. * - * @param streamId stream id. - * @param epoch stream epoch. + * @param streamId stream id. + * @param epoch stream epoch. */ CompletableFuture closeStream(long streamId, long epoch); @@ -70,7 +71,7 @@ public interface StreamManager { /** * Get streams offset. - * + *

* When server is starting or recovering, wal in EBS need streams offset to determine the recover point. * * @param streamIds stream ids. diff --git a/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java index 27e0d4310c..305722739f 100644 --- a/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java +++ b/core/src/main/scala/kafka/log/s3/wal/WriteAheadLog.java @@ -27,18 +27,21 @@ public interface WriteAheadLog { /** * Get log start offset. + * * @return start offset. */ long startOffset(); /** * Get log end offset. + * * @return exclusive end offset. */ long endOffset(); /** * Read data from log. + * * @return list of {@link WalRecord}. */ List read(); diff --git a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java index 932eb677dd..e31f7ec171 100644 --- a/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java +++ b/core/src/test/java/kafka/log/es/AlwaysSuccessClientTest.java @@ -17,15 +17,21 @@ package kafka.log.es; -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.ElasticStreamClientException; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; -import com.automq.elasticstream.client.api.StreamClient; +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.ElasticStreamClientException; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; +import kafka.log.es.api.StreamClient; +import org.apache.kafka.common.errors.es.SlowFetchHintException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -40,11 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.kafka.common.errors.es.SlowFetchHintException; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import static kafka.log.es.AlwaysSuccessClient.HALT_ERROR_CODES; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -69,14 +70,14 @@ public void teardown() { public void basicAppendAndFetch() throws ExecutionException, InterruptedException { client = new AlwaysSuccessClient(new MemoryClient()); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); List payloads = List.of("hello".getBytes(), "world".getBytes()); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = stream.fetch(0, 100, 1000).get(); @@ -97,17 +98,17 @@ public void testQuickFetch() throws ExecutionException, InterruptedException { for (Long delay : quickFetchDelayMillisList) { memoryClientWithDelay.setDelayMillis(delay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = stream.fetch(0, 100, 1000) - .orTimeout(delay + slowFetchTimeoutMillis / 2, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(delay + slowFetchTimeoutMillis / 2, TimeUnit.MILLISECONDS) + .get(); checkAppendAndFetch(payloads, fetched); stream.destroy(); } @@ -123,21 +124,21 @@ public void testSlowFetch() throws ExecutionException, InterruptedException { long slowFetchDelay = slowFetchTimeoutMillis * 3 / 2; memoryClientWithDelay.setDelayMillis(slowFetchDelay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .get(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .get(); CompletableFuture.allOf( - payloads - .stream() - .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) + payloads + .stream() + .map(payload -> stream.append(RawPayloadRecordBatch.of(ByteBuffer.wrap(payload)))).toArray(CompletableFuture[]::new) ).get(); FetchResult fetched = null; AtomicBoolean gotSlowFetchHintException = new AtomicBoolean(false); try { fetched = stream.fetch(0, 100, 1000) - .orTimeout(slowFetchDelay, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(slowFetchDelay, TimeUnit.MILLISECONDS) + .get(); checkAppendAndFetch(payloads, fetched); } catch (ExecutionException e) { // should throw SlowFetchHintException after SLOW_FETCH_TIMEOUT_MILLIS ms @@ -146,8 +147,8 @@ public void testSlowFetch() throws ExecutionException, InterruptedException { SeparateSlowAndQuickFetchHint.reset(); // It should reuse the fetching future above, therefore only (SLOW_FETCH_TIMEOUT_MILLIS / 2) ms is tolerable. fetched = stream.fetch(0, 100, 1000) - .orTimeout(slowFetchTimeoutMillis - 200, TimeUnit.MILLISECONDS) - .get(); + .orTimeout(slowFetchTimeoutMillis - 200, TimeUnit.MILLISECONDS) + .get(); } checkAppendAndFetch(payloads, fetched); assertTrue(gotSlowFetchHintException.get(), "should throw SlowFetchHintException"); @@ -162,12 +163,12 @@ public void testOpenStream() { AtomicBoolean exceptionThrown = new AtomicBoolean(false); openStream(1) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionThrown.set(true); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionThrown.set(true); + return null; + }) + .join(); assertTrue(exceptionThrown.get(), "should throw IOException"); } @@ -179,40 +180,40 @@ public void testStreamOperationHalt() { client = new AlwaysSuccessClient(memoryClientWithDelay); Stream stream = client - .streamClient() - .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) - .join(); + .streamClient() + .createAndOpenStream(CreateStreamOptions.newBuilder().epoch(0).replicaCount(1).build()) + .join(); AtomicInteger exceptionCount = new AtomicInteger(0); stream - .append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes()))) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .append(RawPayloadRecordBatch.of(ByteBuffer.wrap("hello".getBytes()))) + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.fetch(0, 100, 1000) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.trim(0) - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); stream.close() - .exceptionally(e -> { - assertEquals(IOException.class, e.getClass()); - exceptionCount.incrementAndGet(); - return null; - }) - .join(); + .exceptionally(e -> { + assertEquals(IOException.class, e.getClass()); + exceptionCount.incrementAndGet(); + return null; + }) + .join(); assertEquals(4, exceptionCount.get(), "should throw IOException 4 times"); stream.destroy(); } @@ -251,8 +252,8 @@ public void testNormalExceptionHandling() { private CompletableFuture openStream(long streamId) { return client - .streamClient() - .openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build()); + .streamClient() + .openStream(streamId, OpenStreamOptions.newBuilder().epoch(1).build()); } private void checkAppendAndFetch(List rawPayloads, FetchResult fetched) { @@ -266,6 +267,7 @@ static final class MemoryClientWithDelay extends MemoryClient { /** * Set the additional fetching delay + * * @param delayMillis */ public void setDelayMillis(long delayMillis) { @@ -424,9 +426,9 @@ static enum ExceptionHint { OK; private static final List OTHER_EXCEPTION_LIST = List.of( - new IOException("io exception"), - new RuntimeException("runtime exception"), - new ElasticStreamClientException(-1, "other exception") + new IOException("io exception"), + new RuntimeException("runtime exception"), + new ElasticStreamClientException(-1, "other exception") ); public Exception generateException() { diff --git a/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java b/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java index a480cc0120..ee717747e6 100644 --- a/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java +++ b/core/src/test/java/kafka/log/s3/DefaultRecordBatch.java @@ -17,7 +17,7 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.RecordBatch; +import kafka.log.es.api.RecordBatch; import java.nio.ByteBuffer; import java.util.Collections; diff --git a/core/src/test/java/kafka/log/s3/DefaultRecordBatchWithContext.java b/core/src/test/java/kafka/log/s3/DefaultRecordBatchWithContext.java index 0f4b87c9ac..a777d2712f 100644 --- a/core/src/test/java/kafka/log/s3/DefaultRecordBatchWithContext.java +++ b/core/src/test/java/kafka/log/s3/DefaultRecordBatchWithContext.java @@ -17,8 +17,8 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; import java.nio.ByteBuffer; import java.util.Map; @@ -31,6 +31,7 @@ public DefaultRecordBatchWithContext(RecordBatch recordBatch, long baseOffset) { this.recordBatch = recordBatch; this.baseOffset = baseOffset; } + @Override public long baseOffset() { return baseOffset; diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index 416e28de52..bcd94b5354 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -18,31 +18,13 @@ package kafka.log.s3; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import com.automq.elasticstream.client.api.AppendResult; -import com.automq.elasticstream.client.api.CreateStreamOptions; -import com.automq.elasticstream.client.api.FetchResult; -import com.automq.elasticstream.client.api.OpenStreamOptions; -import com.automq.elasticstream.client.api.RecordBatch; -import com.automq.elasticstream.client.api.RecordBatchWithContext; -import com.automq.elasticstream.client.api.Stream; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; - +import kafka.log.es.api.AppendResult; +import kafka.log.es.api.CreateStreamOptions; +import kafka.log.es.api.FetchResult; +import kafka.log.es.api.OpenStreamOptions; +import kafka.log.es.api.RecordBatch; +import kafka.log.es.api.RecordBatchWithContext; +import kafka.log.es.api.Stream; import kafka.log.s3.cache.DefaultS3BlockCache; import kafka.log.s3.cache.S3BlockCache; import kafka.log.s3.memory.MemoryMetadataManager; @@ -59,6 +41,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + @Tag("S3Unit") public class S3StreamMemoryTest { diff --git a/core/src/test/java/kafka/log/s3/S3StreamTest.java b/core/src/test/java/kafka/log/s3/S3StreamTest.java index 01dfdd6017..8388c6a6d8 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamTest.java @@ -17,8 +17,8 @@ package kafka.log.s3; -import com.automq.elasticstream.client.api.ElasticStreamClientException; -import com.automq.elasticstream.client.api.FetchResult; +import kafka.log.es.api.ElasticStreamClientException; +import kafka.log.es.api.FetchResult; import kafka.log.s3.cache.ReadDataBlock; import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.streams.StreamManager; diff --git a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java index cff1f79994..ecae3c66f6 100644 --- a/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamMetadataManagerTest.java @@ -17,16 +17,6 @@ package kafka.log.s3; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import kafka.log.s3.metadata.StreamMetadataManager; import kafka.log.s3.metadata.StreamMetadataManager.StreamMetadataListener; import kafka.server.BrokerServer; @@ -52,6 +42,17 @@ import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; + @Timeout(40) @Tag("S3Unit") public class StreamMetadataManagerTest { @@ -88,19 +89,19 @@ public void setUp() { static { S3ObjectsImage objectsImage = new S3ObjectsImage(2L, Map.of( - 0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), - 1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), - 2L, new S3Object(2L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED) + 0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED), + 2L, new S3Object(2L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED) )); Map ranges = Map.of( - 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) + 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) ); Map streamObjects = Map.of( - 0L, new S3StreamObject(0L, 128, STREAM0, 10L, 100L)); + 0L, new S3StreamObject(0L, 128, STREAM0, 10L, 100L)); S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null); ranges = new HashMap<>(ranges); @@ -109,7 +110,7 @@ public void setUp() { streamObjects.put(1L, new S3StreamObject(1L, 128, STREAM0, 100L, 150L)); streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null); ranges = new HashMap<>(ranges); @@ -118,7 +119,7 @@ public void setUp() { streamObjects.put(2L, new S3StreamObject(2L, 128, STREAM0, 150L, 200L)); streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), - Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); + Map.of(BROKER0, BrokerS3WALMetadataImage.EMPTY)); image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null); } diff --git a/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java index 81960541da..12f4317dae 100644 --- a/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java +++ b/core/src/test/java/kafka/log/s3/StreamObjectCopyerTest.java @@ -17,10 +17,6 @@ package kafka.log.s3; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutionException; - import kafka.log.s3.model.StreamRecordBatch; import kafka.log.s3.operator.MemoryS3Operator; import kafka.log.s3.operator.S3Operator; @@ -30,6 +26,10 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 318a50dab9..cb7da00c09 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -129,6 +129,7 @@ versions += [ zstd: "1.5.2-1", elasticstream: "1.0-SNAPSHOT", s3Client: "2.20.127", + commonLang: "3.12.0", ] libs += [ activation: "javax.activation:activation:$versions.activation", @@ -220,6 +221,7 @@ libs += [ mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", zstd: "com.github.luben:zstd-jni:$versions.zstd", httpclient: "org.apache.httpcomponents:httpclient:$versions.httpclient", - esClient: "com.automq.elasticstream:client:$versions.elasticstream", s3Client: "software.amazon.awssdk:s3:$versions.s3Client", + commonLang: "org.apache.commons:commons-lang3:$versions.commonLang", + nettyAll: "io.netty:netty-all:$versions.netty", ]