Skip to content

Commit

Permalink
KAFKA-5038; Throw correct exception of locking of state directory fails
Browse files Browse the repository at this point in the history
Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2848 from enothereska/KAFKA-5038-trunk
  • Loading branch information
enothereska authored and ijuma committed Apr 12, 2017
1 parent 0be835d commit 1e93c3b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public ProcessorStateManager(final TaskId taskId,
this.taskId = taskId;
this.stateDirectory = stateDirectory;
this.changelogReader = changelogReader;
this.baseDir = stateDirectory.directoryForTask(taskId);
this.logPrefix = String.format("task [%s]", taskId);

this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
this.partitionForTopic.put(source.topic(), source);
Expand All @@ -91,10 +92,18 @@ public ProcessorStateManager(final TaskId taskId,
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;

this.logPrefix = String.format("task [%s]", taskId);

if (!stateDirectory.lock(taskId, 5)) {
throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath()));
throw new LockException(String.format("%s Failed to lock the state directory for task %s",
logPrefix, taskId));
}
// get a handle on the parent/base directory of the task directory
// note that the parent directory could have been accidentally deleted here,
// so catch that exception if that is the case
try {
this.baseDir = stateDirectory.directoryForTask(taskId);
} catch (ProcessorStateException e) {
throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
logPrefix, taskId, e));
}

// load the checkpoint information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,19 @@ File globalStateDir() {
* @throws IOException
*/
boolean lock(final TaskId taskId, int retry) throws IOException {
final File lockFile;
// we already have the lock so bail out here
if (locks.containsKey(taskId)) {
log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
return true;
}
final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
} catch (ProcessorStateException e) {
// directoryForTask could be throwing an exception if another thread
// has concurrently deleted the directory
return false;
}

final FileChannel channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
Expand Down Expand Up @@ -106,7 +107,22 @@ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
}
}

@Test(expected = ProcessorStateException.class)
public void shouldThrowProcessorStateException() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Utils.delete(stateDir);
directory.directoryForTask(taskId);
}

@Test
public void shouldNotLockDeletedDirectory() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Utils.delete(stateDir);
assertFalse(directory.lock(taskId, 0));
}

@Test
public void shouldLockMulitpleTaskDirectories() throws Exception {
final TaskId taskId = new TaskId(0, 0);
Expand Down

0 comments on commit 1e93c3b

Please sign in to comment.