Skip to content

Commit

Permalink
Merge commit '9b792518b2d420d1026cfdc38729e30f45a36c91' into SPARK-24497
Browse files Browse the repository at this point in the history
-recursive-sql
  • Loading branch information
peter-toth committed Jun 18, 2020
2 parents 7eba05f + 9b79251 commit 607aa4d
Show file tree
Hide file tree
Showing 101 changed files with 2,203 additions and 1,072 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;

/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
Expand Down Expand Up @@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);

iteratorTracker = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -238,6 +251,14 @@ public void close() throws IOException {
}

try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
Expand All @@ -252,6 +273,7 @@ public void close() throws IOException {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
Expand All @@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
}
}

/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}

/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public boolean skip(long n) {

@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
assertEquals(expected, results);
}

@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ private[spark] class ExecutorAllocationManager(
if (testing) {
throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
}
logWarning(s"Not removing executor $executorIdsToBeRemoved because the " +
logWarning(s"Not removing executor $executorIdToBeRemoved because the " +
"ResourceProfile was UNKNOWN!")
} else {
// get the running total as we remove or initialize it to the count - pendingRemoval
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,7 @@ private[spark] class Executor(
val threadName = s"Executor task launch worker for task $taskId"
val taskName = taskDescription.name
val mdcProperties = taskDescription.properties.asScala
.filter(_._1.startsWith("mdc.")).map { item =>
val key = item._1.substring(4)
(key, item._2)
}.toSeq
.filter(_._1.startsWith("mdc.")).toSeq

/** If specified, this task has been killed and this option contains the reason. */
@volatile private var reasonIfKilled: Option[String] = None
Expand Down Expand Up @@ -705,7 +702,7 @@ private[spark] class Executor(
MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("taskName", taskName)
MDC.put("mdc.taskName", taskName)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ private[spark] class AppStatusStore(
def applicationInfo(): v1.ApplicationInfo = {
try {
// The ApplicationInfo may not be available when Spark is starting up.
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Utils.tryWithResource(
store.view(classOf[ApplicationInfoWrapper])
.max(1)
.closeableIterator()
) { it =>
it.next().info
}
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application information. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
val id = sc.getExecutorIds().head
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
// set up a barrier stage with 2 tasks and both tasks prefer the same executor (only 1 core) for
// scheduling. So, one of tasks won't be scheduled in one round of resource offer.
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq(s"executor_h_$id"), Seq(s"executor_h_$id")))
val errorMsg = intercept[SparkException] {
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
Expand Down
Loading

0 comments on commit 607aa4d

Please sign in to comment.