diff --git a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java index 14359a7e78..8c768f3b3e 100644 --- a/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java +++ b/automq-shell/src/main/java/com/automq/shell/log/LogUploader.java @@ -72,20 +72,16 @@ public static LogUploader getInstance() { return INSTANCE; } - public void close() { + public void close() throws InterruptedException { closed = true; if (uploadThread != null) { - uploadThread.interrupt(); - cleanupThread.interrupt(); - - if (uploadBuffer.readableBytes() > 0) { - try { - s3Operator.write(getObjectKey(), uploadBuffer, ThrottleStrategy.BYPASS).get(); - } catch (Exception ignore) { - } - } + uploadThread.join(); s3Operator.close(); } + + if (cleanupThread != null) { + cleanupThread.interrupt(); + } } @Override @@ -139,7 +135,6 @@ private void initUploadComponent() { } catch (Exception e) { LOGGER.error("Initialize log uploader failed", e); } - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); }, command -> new Thread(command).start()); } } @@ -183,7 +178,7 @@ public void run() { upload(now); } uploadBuffer.writeBytes(bytes); - } else if (closed) { + } else if (closed && queue.isEmpty()) { upload(now); break; } else if (now - lastUploadTimestamp > nextUploadInterval) { @@ -201,7 +196,7 @@ private void upload(long now) { if (uploadBuffer.readableBytes() > 0) { if (couldUpload()) { try { - while (!closed && !Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted()) { if (s3Operator == null) { break; } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index aeffa1603a..d64d8dfd0b 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -19,7 +19,7 @@ package kafka import com.automq.shell.AutoMQApplication import com.automq.shell.auth.{CredentialsProviderHolder, EnvVariableCredentialsProvider} -import com.automq.shell.log.S3LogConfig +import com.automq.shell.log.{LogUploader, S3LogConfig} import com.automq.shell.model.S3Url import com.automq.stream.s3.ByteBufAlloc import joptsimple.OptionParser @@ -162,8 +162,10 @@ object Kafka extends Logging { // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", { - try server.shutdown() - catch { + try { + server.shutdown() + LogUploader.getInstance().close() + } catch { case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.