From a694e7a9047efe106ff41be02a12c9e46cc3384f Mon Sep 17 00:00:00 2001 From: Greg Fodor Date: Mon, 25 Apr 2016 00:05:12 -0700 Subject: [PATCH] Close file channel when lock is released --- .../streams/processor/internals/ProcessorStateManager.java | 4 ++++ .../kafka/streams/processor/internals/StreamThread.java | 1 + .../processor/internals/ProcessorStateManagerTest.java | 5 ++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 003b988273fb5..0cdf44cbd0ca4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -134,6 +134,9 @@ private static FileLock lockStateDirectory(File stateDir, int retry) throws IOEx retry--; lock = lockStateDirectory(channel); } + if (lock == null) { + channel.close(); + } return lock; } @@ -368,6 +371,7 @@ public void close(Map ackedOffsets) throws IOException { } finally { // release the state directory directoryLock directoryLock.release(); + directoryLock.channel().close(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f02683e587170..eff90e88cfb01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -519,6 +519,7 @@ protected void maybeClean() { if (directoryLock != null) { try { directoryLock.release(); + directoryLock.channel().close(); } catch (IOException e) { log.error("Failed to release the state directory lock"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 84b59e639f79d..e3669e8c37d8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -213,7 +213,10 @@ public void testLockStateDirectory() throws IOException { try { assertNotNull(lock); } finally { - if (lock != null) lock.release(); + if (lock != null) { + lock.release(); + lock.channel().close(); + } } } finally { Utils.delete(baseDir);