Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
cchighman committed Jun 20, 2020
2 parents 2b1c5dc + 297016e commit 2126c85
Show file tree
Hide file tree
Showing 124 changed files with 3,348 additions and 1,365 deletions.
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;

/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
Expand Down Expand Up @@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);

iteratorTracker = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -238,6 +251,14 @@ public void close() throws IOException {
}

try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
Expand All @@ -252,6 +273,7 @@ public void close() throws IOException {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
Expand All @@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
}
}

/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}

/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
Expand Down
Expand Up @@ -185,6 +185,7 @@ public boolean skip(long n) {

@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
assertEquals(expected, results);
}

@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand Down
Expand Up @@ -518,7 +518,7 @@ private[spark] class ExecutorAllocationManager(
if (testing) {
throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
}
logWarning(s"Not removing executor $executorIdsToBeRemoved because the " +
logWarning(s"Not removing executor $executorIdToBeRemoved because the " +
"ResourceProfile was UNKNOWN!")
} else {
// get the running total as we remove or initialize it to the count - pendingRemoval
Expand Down
Expand Up @@ -39,7 +39,13 @@ private[spark] class AppStatusStore(
def applicationInfo(): v1.ApplicationInfo = {
try {
// The ApplicationInfo may not be available when Spark is starting up.
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Utils.tryWithResource(
store.view(classOf[ApplicationInfoWrapper])
.max(1)
.closeableIterator()
) { it =>
it.next().info
}
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application information. " +
Expand Down
Expand Up @@ -276,11 +276,12 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
val id = sc.getExecutorIds().head
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
// set up a barrier stage with 2 tasks and both tasks prefer the same executor (only 1 core) for
// scheduling. So, one of tasks won't be scheduled in one round of resource offer.
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"), Seq("executor_h_0")))
val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq(s"executor_h_$id"), Seq(s"executor_h_$id")))
val errorMsg = intercept[SparkException] {
rdd.barrier().mapPartitions { iter =>
BarrierTaskContext.get().barrier()
Expand Down
1 change: 1 addition & 0 deletions dev/.rat-excludes
Expand Up @@ -123,3 +123,4 @@ SessionManager.java
SessionHandler.java
GangliaReporter.java
application_1578436911597_0052
config.properties
148 changes: 148 additions & 0 deletions dev/create-release/known_translations
Expand Up @@ -261,3 +261,151 @@ yanlin-Lynn - Yanlin Wang
yucai - Yucai Yu
zhengruifeng - Ruifeng Zheng
zuotingbing - Tingbing Zuo
012huang - Weiyi Huang
07ARB - Ankit Raj Boudh
Andrew-Crosby - Andrew Crosby
AngersZhuuuu - Yi Zhu
Deegue - Yizhong Zhang
Gschiavon - German Schiavon Matteo
GuoPhilipse - Philipse Guo
Hellsen83 - Erik Christiansen
Icysandwich - Icysandwich
JasonWayne - Wenjie Wu
JkSelf - Ke Jia
JoanFM - Joan Fontanals
JulienPeloton - Julien Peloton
Koraseg - Artem Kupchinskiy
KyleLi1985 - Liang Li
LiShuMing - Shuming Li
LinhongLiu - Liu, Linhong
LuciferYang - Yang Jie
MaxGekk - Maxim Gekk
Ngone51 - Yi Wu
PavithraRamachandran - Pavithra Ramachandran
SongYadong - Yadong Song
TigerYang414 - David Yang
TomokoKomiyama - Tomoko Komiyama
TopGunViper - TopGunViper
Udbhav30 - Udbhav Agrawal
WangGuangxin - Guangxin Wang
William1104 - William Wong
YongjinZhou - Yongjin Zhou
aaruna - Aaruna Godthi
adrian555 - Weiqiang Zhuang
ajithme - Ajith S
amanomer - Aman Omer
ancasarb - Anca Sarb
avkgh - Aleksandr Kashkirov
ayudovin - Artsiom Yudovin
bartosz25 - Bartosz Konieczny
beliefer - Jiaan Geng
bettermouse - Chen Hao
bscan - Brian Scannell
cchung100m - Neo Chien
cclauss - Christian Clauss
chakravarthiT - Chakravarthi
chandulal - Chandu Kavar
chitralverma - Chitral Verma
cjn082030 - Jenny
cloud-fan - Wenchen Fan
codeborui - codeborui
colinmjj - Colin Ma
cxzl25 - cxzl25
cyq89051127 - Yongqiang Chai
darrentirto - Darren Tirto
daviddingly - Xiaoyuan Ding
davidvrba - David Vrba
deepyaman - Deepyaman Datta
denglingang - Lingang Deng
dengziming - dengziming
deshanxiao - deshanxiao
dima-asana - Dima Kamalov
dlindelof - David Lindelof
dongjoon-hyun - Dongjoon Hyun
eatoncys - eatoncys
fan31415 - Yijie Fan
fitermay - Yuli Fiterman
francis0407 - Mingcong Han
fuwhu - Fuwang Hu
gss2002 - Greg Senia
hddong - Dongdong Hong
hehuiyuan - hehuiyuan
helenyugithub - Helen Yu
highmoutain - highmoutain
httfighter - Tiantian Han
huangtianhua - huangtianhua
hvanhovell - Herman Van Hovell
iRakson - Rakesh Raushan
igorcalabria - Igor Calabria
imback82 - Terry Kim
javierivanov - Javier Fuentes
joelgenter - Joel Genter
ketank-new - Ketan Kunde
laskfla - Keith Sun
lcqzte10192193 - Chaoqun Li
leoluan2009 - Xuedong Luan
liangxs - Xuesen Liang
lidinghao - Li Hao
linehrr - Ryne Yang
linzebing - Zebing Lin
lipzhu - Lipeng Zhu
liucht-inspur - liucht-inspur
liupc - Pengcheng Liu
liwensun - Liwen Sun
manuzhang - Manu Zhang
mareksimunek - Marek Simunek
masa3141 - Masahiro Kazama
mdianjun - Dianjun Ma
merrily01 - Ruilei Ma
mob-ai - mob-ai
mu5358271 - Shuheng Dai
mwlon - Martin Loncaric
nandorKollar - Nandor Kollar
nooberfsh - nooberfsh
oleg-smith - Oleg Kuznetsov
ozancicek - Ozan Cicekci
pengbo - Peng Bo
planga82 - Pablo Langa Blanco
praneetsharma - Praneet Sharma
ptkool - Michael Styles
qb-tarushg - Tarush Grover
redsanket - Sanket Reddy
redsk - Nicola Bova
roland1982 - roland1982
rongma1997 - Rong Ma
rrusso2007 - Rob Russo
samsetegne - Samuel L. Setegne
sangramga - Sangram Gaikwad
sarthfrey - Sarth Frey
seayoun - Haiyang Yu
sev7e0 - Jiaqi Li
shahidki31 - Shahid
sharangk - Sharanabasappa G Keriwaddi
sheepstop - Ting Yang
shivsood - Shiv Prashant Sood
sitegui - Guilherme Souza
slamke - Sun Ke
southernriver - Liang Chen
squito - Imran Rashid
stczwd - Jackey Lee
sujith71955 - Sujith Chacko
suxingfate - Xinglong Wang
teeyog - teeyog
tinhto-000 - Tin Hang To
tools4origins - tools4origins
triplesheep - triplesheep
turboFei - Fei Wang
ulysses-you - ulysses-you
uzadude - Ohad Raviv
wackxu - wackxu
wangjiaochun - wangjiaochun
wangshisan - wangshisan
weixiuli - XiuLi Wei
wenfang6 - wenfang6
wenxuanguan - wenxuanguan
windpiger - Song Jun
woudygao - Woudy Gao
xianyinxin - Xianyin Xin
yunzoud - Yun Zou
zero323 - Maciej Szymkiewicz
zjf2012 - Jiafu Zhang
2 changes: 1 addition & 1 deletion docs/running-on-kubernetes.md
Expand Up @@ -1341,7 +1341,7 @@ The following affect the driver and executor containers. All other containers in
<td>See description</td>
<td>
The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
"executor" for each executor container) if not defined by the pod template. If the container is defined by the
"spark-kubernetes-executor" for each executor container) if not defined by the pod template. If the container is defined by the
template, the template's name will be used.
</td>
</tr>
Expand Down
17 changes: 16 additions & 1 deletion docs/running-on-yarn.md
Expand Up @@ -82,6 +82,18 @@ In `cluster` mode, the driver runs on a different machine than the client, so `S

Running Spark on YARN requires a binary distribution of Spark which is built with YARN support.
Binary distributions can be downloaded from the [downloads page](https://spark.apache.org/downloads.html) of the project website.
There are two variants of Spark binary distributions you can download. One is pre-built with a certain
version of Apache Hadoop; this Spark distribution contains built-in Hadoop runtime, so we call it `with-hadoop` Spark
distribution. The other one is pre-built with user-provided Hadoop; since this Spark distribution
doesn't contain a built-in Hadoop runtime, it's smaller, but users have to provide a Hadoop installation separately.
We call this variant `no-hadoop` Spark distribution. For `with-hadoop` Spark distribution, since
it contains a built-in Hadoop runtime already, by default, when a job is submitted to Hadoop Yarn cluster, to prevent jar conflict, it will not
populate Yarn's classpath into Spark. To override this behavior, you can set <code>spark.yarn.populateHadoopClasspath=true</code>.
For `no-hadoop` Spark distribution, Spark will populate Yarn's classpath by default in order to get Hadoop runtime. For `with-hadoop` Spark distribution,
if your application depends on certain library that is only available in the cluster, you can try to populate the Yarn classpath by setting
the property mentioned above. If you run into jar conflict issue by doing so, you will need to turn it off and include this library
in your application jar.

To build Spark yourself, refer to [Building Spark](building-spark.html).

To make Spark runtime jars accessible from YARN side, you can specify `spark.yarn.archive` or `spark.yarn.jars`. For details please refer to [Spark Properties](running-on-yarn.html#spark-properties). If neither `spark.yarn.archive` nor `spark.yarn.jars` is specified, Spark will create a zip file with all jars under `$SPARK_HOME/jars` and upload it to the distributed cache.
Expand Down Expand Up @@ -396,7 +408,10 @@ To use a custom metrics.properties for the application master and executors, upd
</tr>
<tr>
<td><code>spark.yarn.populateHadoopClasspath</code></td>
<td>true</td>
<td>
For <code>with-hadoop</code> Spark distribution, this is set to false;
for <code>no-hadoop</code> distribution, this is set to true.
</td>
<td>
Whether to populate Hadoop classpath from <code>yarn.application.classpath</code> and
<code>mapreduce.application.classpath</code> Note that if this is set to <code>false</code>,
Expand Down
5 changes: 3 additions & 2 deletions docs/structured-streaming-kafka-integration.md
Expand Up @@ -440,9 +440,10 @@ The following configurations are optional:
<tr>
<td>kafkaConsumer.pollTimeoutMs</td>
<td>long</td>
<td>512</td>
<td>120000</td>
<td>streaming and batch</td>
<td>The timeout in milliseconds to poll data from Kafka in executors.</td>
<td>The timeout in milliseconds to poll data from Kafka in executors. When not defined it falls
back to <code>spark.network.timeout</code>.</td>
</tr>
<tr>
<td>fetchOffset.numRetries</td>
Expand Down

0 comments on commit 2126c85

Please sign in to comment.