Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26265][Core] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager #23272

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -255,11 +255,18 @@ private MapIterator(int numRecords, Location loc, boolean destructive) {
}

private void advanceToNextPage() {
// SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
// to free a memory page by calling `freePage`. At the same time, it is possibly that another
// memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
// acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
// reference to the page to free and free it after releasing the lock of `MapIterator`.
viirya marked this conversation as resolved.
Show resolved Hide resolved
MemoryBlock pageToFree = null;

synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
freePage(currentPage);
pageToFree = currentPage;
nextIdx --;
}
if (dataPages.size() > nextIdx) {
Expand All @@ -283,6 +290,9 @@ private void advanceToNextPage() {
}
}
}
if (pageToFree != null) {
freePage(pageToFree);
viirya marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
Expand Down
Expand Up @@ -38,12 +38,14 @@ public long spill(long size, MemoryConsumer trigger) throws IOException {
return used;
}

void use(long size) {
@VisibleForTesting
viirya marked this conversation as resolved.
Show resolved Hide resolved
public void use(long size) {
long got = taskMemoryManager.acquireExecutionMemory(size, this);
used += got;
}

void free(long size) {
@VisibleForTesting
public void free(long size) {
used -= size;
taskMemoryManager.releaseExecutionMemory(size, this);
}
Expand Down
Expand Up @@ -33,6 +33,7 @@

import org.apache.spark.SparkConf;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.network.util.JavaUtils;
Expand Down Expand Up @@ -667,4 +668,53 @@ public void testPeakMemoryUsed() {
}
}

@Test
public void avoidDeadlock() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @viirya . Since this test case reproduces Deadlock situation, we need a timeout logic. Otherwise, it will hang (instead of failures) when we hit this issue later.

Copy link
Member Author

@viirya viirya Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried several ways to set a timeout logic, but don't work. The deadlock always hangs the test and timeout logic.

memoryManager.limit(PAGE_SIZE_BYTES);
TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager);
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);

Runnable memoryConsumer = new Runnable() {
@Override
public void run() {
int i = 0;
long used = 0;
while (i < 10) {
c1.use(10000000);
used += 10000000;
i++;
}
c1.free(used);
}
};

Thread thread = new Thread(memoryConsumer);
viirya marked this conversation as resolved.
Show resolved Hide resolved

try {
int i;
for (i = 0; i < 1024; i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use for (int i = 0; ... here and line 708 because int i is not referenced outside of for loop.
Never mind. I found that this is the convention in this test suite.

final long[] arr = new long[]{i};
final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
}

// Starts to require memory at another memory consumer.
thread.start();

BytesToBytesMap.MapIterator iter = map.destructiveIterator();
for (i = 0; i < 1024; i++) {
iter.next();
}
assertFalse(iter.hasNext());
} finally {
map.free();
thread.join();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line where the test hangs without the fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When without this line, the test still hangs. The test thread hangs on the deadlock with the other thread of running memoryConsumer.

Copy link
Member Author

@viirya viirya Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line just makes sure memoryConsumer to end and free acquired memory.

for (File spillFile : spillFilesCreated) {
assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
spillFile.exists());
}
}
}

}