Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-31957
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed Jun 15, 2020
2 parents d055d60 + f83cb3c commit 1177e9a
Show file tree
Hide file tree
Showing 107 changed files with 2,863 additions and 1,027 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -334,7 +334,7 @@
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-htmlunit-driver</artifactId>
<artifactId>htmlunit-driver</artifactId>
<scope>test</scope>
</dependency>
<!-- Coerce sbt into honoring these dependency updates: -->
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Large diffs are not rendered by default.

This file was deleted.

3 changes: 2 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.js
Expand Up @@ -87,7 +87,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
Expand Down
19 changes: 6 additions & 13 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Expand Up @@ -323,10 +323,7 @@ private[spark] class Executor(
val threadName = s"Executor task launch worker for task $taskId"
val taskName = taskDescription.name
val mdcProperties = taskDescription.properties.asScala
.filter(_._1.startsWith("mdc.")).map { item =>
val key = item._1.substring(4)
(key, item._2)
}.toSeq
.filter(_._1.startsWith("mdc.")).toSeq

/** If specified, this task has been killed and this option contains the reason. */
@volatile private var reasonIfKilled: Option[String] = None
Expand Down Expand Up @@ -401,9 +398,7 @@ private[spark] class Executor(
}

override def run(): Unit = {

setMDCForTask(taskName, mdcProperties)

threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
Expand Down Expand Up @@ -703,11 +698,11 @@ private[spark] class Executor(
}

private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
MDC.put("taskName", taskName)

mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
// make sure we run the task with the user-specified mdc properties only
MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("mdc.taskName", taskName)
}

/**
Expand Down Expand Up @@ -750,9 +745,7 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)

override def run(): Unit = {

setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)

val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs
Expand Down
Expand Up @@ -525,7 +525,13 @@ private[spark] class AppStatusStore(
}

def appSummary(): AppSummary = {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
try {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application summary. " +
"If you are starting up Spark, please wait a while until it's ready.")
}
}

def close(): Unit = {
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Expand Up @@ -23,6 +23,7 @@ import javax.servlet.DispatcherType
import javax.servlet.http._

import scala.language.implicitConversions
import scala.util.Try
import scala.xml.Node

import org.eclipse.jetty.client.HttpClient
Expand Down Expand Up @@ -500,7 +501,11 @@ private[spark] case class ServerInfo(
threadPool match {
case pool: QueuedThreadPool =>
// Workaround for SPARK-30385 to avoid Jetty's acceptor thread shrink.
pool.setIdleTimeout(0)
// As of Jetty 9.4.21, the implementation of
// QueuedThreadPool#setIdleTimeout is changed and IllegalStateException
// will be thrown if we try to set idle timeout after the server has started.
// But this workaround works for Jetty 9.4.28 by ignoring the exception.
Try(pool.setIdleTimeout(0))
case _ =>
}
server.stop()
Expand Down
Expand Up @@ -24,6 +24,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.io.Source
import scala.xml.Node

import com.gargoylesoftware.css.parser.CSSParseException
import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler
import org.json4s._
import org.json4s.jackson.JsonMethods
Expand All @@ -33,7 +34,6 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.scalatestplus.selenium.WebBrowser
import org.w3c.css.sac.CSSParseException

import org.apache.spark._
import org.apache.spark.LocalSparkContext._
Expand Down
2 changes: 0 additions & 2 deletions dev/.rat-excludes
Expand Up @@ -31,9 +31,7 @@ dagre-d3.min.js
graphlib-dot.min.js
sorttable.js
vis-timeline-graph2d.min.js
vis-timeline-graph2d.min.js.map
vis-timeline-graph2d.min.css
vis-timeline-graph2d.min.css.map
dataTables.bootstrap4.1.10.20.min.css
dataTables.bootstrap4.1.10.20.min.js
dataTables.rowsGroup.js
Expand Down
14 changes: 3 additions & 11 deletions dev/create-release/do-release.sh
Expand Up @@ -17,8 +17,6 @@
# limitations under the License.
#

set -e

SELF=$(cd $(dirname $0) && pwd)
. "$SELF/release-util.sh"

Expand Down Expand Up @@ -54,6 +52,9 @@ function should_build {
if should_build "tag" && [ $SKIP_TAG = 0 ]; then
run_silent "Creating release tag $RELEASE_TAG..." "tag.log" \
"$SELF/release-tag.sh"
echo "It may take some time for the tag to be synchronized to github."
echo "Press enter when you've verified that the new tag ($RELEASE_TAG) is available."
read
else
echo "Skipping tag creation for $RELEASE_TAG."
fi
Expand All @@ -78,12 +79,3 @@ if should_build "publish"; then
else
echo "Skipping publish step."
fi

if should_build "tag" && [ $SKIP_TAG = 0 ]; then
git push origin $RELEASE_TAG
if [[ $RELEASE_TAG != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi
fi
15 changes: 6 additions & 9 deletions dev/create-release/release-build.sh
Expand Up @@ -92,12 +92,9 @@ BASE_DIR=$(pwd)
init_java
init_maven_sbt

# Only clone repo fresh if not present, otherwise use checkout from the tag step
if [ ! -d spark ]; then
git clone "$ASF_REPO"
fi
rm -rf spark
git clone "$ASF_REPO"
cd spark
git fetch
git checkout $GIT_REF
git_hash=`git rev-parse --short HEAD`
echo "Checked out Spark git hash $git_hash"
Expand Down Expand Up @@ -167,7 +164,7 @@ fi
DEST_DIR_NAME="$SPARK_PACKAGE_VERSION"

git clean -d -f -x
rm .gitignore
rm -f .gitignore
cd ..

if [[ "$1" == "package" ]]; then
Expand All @@ -177,9 +174,9 @@ if [[ "$1" == "package" ]]; then

# For source release in v2.4+, exclude copy of binary license/notice
if [[ $SPARK_VERSION > "2.4" ]]; then
rm spark-$SPARK_VERSION/LICENSE-binary
rm spark-$SPARK_VERSION/NOTICE-binary
rm -r spark-$SPARK_VERSION/licenses-binary
rm -f spark-$SPARK_VERSION/LICENSE-binary
rm -f spark-$SPARK_VERSION/NOTICE-binary
rm -rf spark-$SPARK_VERSION/licenses-binary
fi

tar cvzf spark-$SPARK_VERSION.tgz --exclude spark-$SPARK_VERSION/.git spark-$SPARK_VERSION
Expand Down
16 changes: 13 additions & 3 deletions dev/create-release/release-tag.sh
Expand Up @@ -25,7 +25,6 @@ function exit_with_usage {
cat << EOF
usage: $NAME
Tags a Spark release on a particular branch.
You must push the tags after.
Inputs are specified with the following environment variables:
ASF_USERNAME - Apache Username
Expand Down Expand Up @@ -106,8 +105,19 @@ sed -i".tmp7" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION"

git commit -a -m "Preparing development version $NEXT_VERSION"

cd ..
if is_dry_run; then
if ! is_dry_run; then
# Push changes
git push origin $RELEASE_TAG
if [[ $RELEASE_VERSION != *"preview"* ]]; then
git push origin HEAD:$GIT_BRANCH
else
echo "It's preview release. We only push $RELEASE_TAG to remote."
fi

cd ..
rm -rf spark
else
cd ..
mv spark spark.tag
echo "Clone with version changes and tag available as spark.tag in the output directory."
fi
8 changes: 4 additions & 4 deletions docs/configuration.md
Expand Up @@ -2955,11 +2955,11 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config
`log4j.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j.properties.template` located there.

By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something
like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in
By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `mdc.taskName`, which shows something
like `task 1.0 in stage 0.0`. You can add `%X{mdc.taskName}` to your patternLayout in
order to print it in the logs.
Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC.
The key in MDC will be the string after the `mdc.` prefix.
Moreover, you can use `spark.sparkContext.setLocalProperty(s"mdc.$name", "value")` to add user specific data into MDC.
The key in MDC will be the string of "mdc.$name".

# Overriding configuration directory

Expand Down
10 changes: 10 additions & 0 deletions docs/structured-streaming-programming-guide.md
Expand Up @@ -1675,6 +1675,16 @@ Any of the stateful operation(s) after any of below stateful operations can have
As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
emits late rows if the operator uses Append mode.

Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:

1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent.

Please note that the definition of "input" is relative: it doesn't always mean "input rows" for the operator.
Streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to check the fact whether the value is zero
or non-zero.

There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

Expand Down
29 changes: 14 additions & 15 deletions licenses-binary/LICENSE-vis-timeline.txt
@@ -1,23 +1,22 @@
vis-timeline and vis-graph2d
https://visjs.github.io/vis-timeline/
vis.js
https://github.com/almende/vis

Create a fully customizable, interactive timeline with items and ranges.
A dynamic, browser-based visualization library.

@version 7.3.4
@date 2020-03-18T17:03:58.105Z

@copyright (c) 2011-2017 Almende B.V, http://almende.com
@copyright (c) 2017-2019 visjs contributors, https://github.com/visjs
@version 4.20.1-SNAPSHOT
@date 2017-10-12

@license
vis.js is dual licensed under both
Copyright (C) 2011-2017 Almende B.V, http://almende.com

Vis.js is dual licensed under both

1. The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0
* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and
and

2. The MIT License
http://opensource.org/licenses/MIT
* The MIT License
http://opensource.org/licenses/MIT

vis.js may be distributed under either license.
Vis.js may be distributed under either license.
29 changes: 14 additions & 15 deletions licenses/LICENSE-vis-timeline.txt
@@ -1,23 +1,22 @@
vis-timeline and vis-graph2d
https://visjs.github.io/vis-timeline/
vis.js
https://github.com/almende/vis

Create a fully customizable, interactive timeline with items and ranges.
A dynamic, browser-based visualization library.

@version 7.3.4
@date 2020-03-18T17:03:58.105Z

@copyright (c) 2011-2017 Almende B.V, http://almende.com
@copyright (c) 2017-2019 visjs contributors, https://github.com/visjs
@version 4.20.1-SNAPSHOT
@date 2017-10-12

@license
vis.js is dual licensed under both
Copyright (C) 2011-2017 Almende B.V, http://almende.com

Vis.js is dual licensed under both

1. The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0
* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and
and

2. The MIT License
http://opensource.org/licenses/MIT
* The MIT License
http://opensource.org/licenses/MIT

vis.js may be distributed under either license.
Vis.js may be distributed under either license.
Expand Up @@ -594,7 +594,7 @@ class LogisticRegression @Since("1.2.0") (
Vectors.dense(if (numClasses == 2) Double.PositiveInfinity else Double.NegativeInfinity)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()
return createModel(dataset, numClasses, coefMatrix, interceptVec, Array.empty)
return createModel(dataset, numClasses, coefMatrix, interceptVec, Array(0.0))
}

if (!$(fitIntercept) && isConstantLabel) {
Expand Down Expand Up @@ -1545,13 +1545,19 @@ sealed trait LogisticRegressionSummary extends Serializable {
*/
sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary {

/** objective function (scaled loss + regularization) at each iteration. */
/**
* objective function (scaled loss + regularization) at each iteration.
* It contains one more element, the initial state, than number of iterations.
*/
@Since("1.5.0")
def objectiveHistory: Array[Double]

/** Number of training iterations. */
@Since("1.5.0")
def totalIterations: Int = objectiveHistory.length
def totalIterations: Int = {
assert(objectiveHistory.length > 0, s"objectiveHistory length should be greater than 1.")
objectiveHistory.length - 1
}

}

Expand Down

0 comments on commit 1177e9a

Please sign in to comment.