Skip to content

Commit

Permalink
[SPARK-31929][WEBUI] Close leveldbiterator when leveldb.close
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Close LevelDBIterator when LevelDB.close() is called.

### Why are the changes needed?

This pull request would prevent JNI resources leaking from Level DB instance and its' iterators. In before implementation JNI resources from LevelDBIterator are cleaned by finalize() function. This behavior is also mentioned in comments of ["LevelDBIterator.java"](https://github.com/apache/spark/blob/master/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java) by squito . But if DB instance is already closed, then iterator's close method would be ignored. LevelDB's iterator would keep level db files opened (for the case table cache is filled up), till iterator.close() is called. Then these JNI resources (file handle) would be leaked.
This JNI resource leaking issue would cause the problem described in [SPARK-31929](https://issues.apache.org/jira/browse/SPARK-31929) on Windows: in spark history server, leaked file handle for level db files would trigger "IOException" when HistoryServerDiskManager try to remove them for releasing disk space.
![IOException](https://user-images.githubusercontent.com/10524738/84134659-7c388680-aa7b-11ea-807f-04dcfa7886a0.JPG)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add unit test and manually tested it.

Closes #28769 from zhli1142015/close-leveldbiterator-when-leveldb.close.

Authored-by: Zhen Li <zhli@microsoft.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
zhli1142015 authored and srowen committed Jun 16, 2020
1 parent 8d57709 commit 2ec9b86
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;

/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
Expand Down Expand Up @@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);

iteratorTracker = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -238,6 +251,14 @@ public void close() throws IOException {
}

try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
Expand All @@ -252,6 +273,7 @@ public void close() throws IOException {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
Expand All @@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
}
}

/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}

/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
Expand Down
Expand Up @@ -185,6 +185,7 @@ public boolean skip(long n) {

@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
assertEquals(expected, results);
}

@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand Down
Expand Up @@ -39,7 +39,13 @@ private[spark] class AppStatusStore(
def applicationInfo(): v1.ApplicationInfo = {
try {
// The ApplicationInfo may not be available when Spark is starting up.
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Utils.tryWithResource(
store.view(classOf[ApplicationInfoWrapper])
.max(1)
.closeableIterator()
) { it =>
it.next().info
}
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application information. " +
Expand Down

0 comments on commit 2ec9b86

Please sign in to comment.