From 11369803de727e39313bafb3da1b9b9c8ce11b42 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 8 Jan 2025 00:40:41 +0000 Subject: [PATCH] decrements total queued mutation size when update idles fixes #5235 --- .../accumulo/tserver/TabletClientHandler.java | 16 +++++++++++++++- .../apache/accumulo/tserver/TabletServer.java | 7 ++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 087c733e6f9..ca466159ed7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -258,7 +258,21 @@ public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdura UpdateSession us = new UpdateSession(new TservConstraintEnv(server.getContext(), security, credentials), - credentials, durability); + credentials, durability) { + @Override + public boolean cleanup() { + // This is called when a client abandons a session. When this happens need to decrement + // any queued mutations. + if (queuedMutationSize > 0) { + log.trace( + "cleaning up abandoned update session, decrementing totalQueuedMutationSize by {}", + queuedMutationSize); + server.updateTotalQueuedMutationSize(-queuedMutationSize); + queuedMutationSize = 0; + } + return true; + } + }; return server.sessionManager.createSession(us, false); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3a3ad3f3b14..beb2e550a68 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -417,7 +417,12 @@ public void run() { } public long updateTotalQueuedMutationSize(long additionalMutationSize) { - return totalQueuedMutationSize.addAndGet(additionalMutationSize); + var newTotal = totalQueuedMutationSize.addAndGet(additionalMutationSize); + if (log.isTraceEnabled()) { + log.trace("totalQueuedMutationSize is now {} after adding {}", newTotal, + additionalMutationSize); + } + return newTotal; } @Override