Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-26127
Browse files Browse the repository at this point in the history
  • Loading branch information
mgaido91 committed Nov 21, 2018
2 parents ce4a02e + a480a62 commit 762e4de
Show file tree
Hide file tree
Showing 19 changed files with 385 additions and 226 deletions.
1 change: 1 addition & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ test_that("Specify a schema by using a DDL-formatted string when reading", {
expect_false(awaitTermination(q, 5 * 1000))
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
stopQuery(q)

expect_error(read.stream(path = parquetPath, schema = "name stri"),
"DataType stri is not supported.")
Expand Down
27 changes: 16 additions & 11 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,40 @@ if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

SPARK_ENV_SH="spark-env.sh"
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1

export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"

if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
SPARK_ENV_SH="${SPARK_CONF_DIR}/${SPARK_ENV_SH}"
if [[ -f "${SPARK_ENV_SH}" ]]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${SPARK_CONF_DIR}/spark-env.sh"
. ${SPARK_ENV_SH}
set +a
fi
fi

# Setting SPARK_SCALA_VERSION if not already set.

if [ -z "$SPARK_SCALA_VERSION" ]; then
SCALA_VERSION_1=2.12
SCALA_VERSION_2=2.11

ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"

if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for multiple Scala versions detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}"
ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}"
ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables"
if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then
echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2
echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2
echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2
exit 1
fi

if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
if [[ -d "$ASSEMBLY_DIR_1" ]]; then
export SPARK_SCALA_VERSION=${SCALA_VERSION_1}
else
export SPARK_SCALA_VERSION="2.12"
export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
fi
fi
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Date, Locale}
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.immutable.Map
Expand Down Expand Up @@ -269,10 +269,11 @@ private[spark] class SparkHadoopUtil extends Logging {
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses,
(o1: FileStatus, o2: FileStatus) => {
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
})
}
})
fileStatuses
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,12 @@ package object config {
.stringConf
.createOptional

private[spark] val UI_REQUEST_HEADER_SIZE =
ConfigBuilder("spark.ui.requestHeaderSize")
.doc("Value for HTTP request header size in bytes.")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("8k")

private[spark] val EXTRA_LISTENERS = ConfigBuilder("spark.extraListeners")
.doc("Class names of listeners to add to SparkContext during initialization.")
.stringConf
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ private[spark] object JettyUtils extends Logging {

(connector, connector.getLocalPort())
}
val httpConfig = new HttpConfiguration()
httpConfig.setRequestHeaderSize(conf.get(UI_REQUEST_HEADER_SIZE).toInt)

// If SSL is configured, create the secure connector first.
val securePort = sslOptions.createJettySslContextFactory().map { factory =>
val securePort = sslOptions.port.getOrElse(if (port > 0) Utils.userPort(port, 400) else 0)
val secureServerName = if (serverName.nonEmpty) s"$serverName (HTTPS)" else serverName
val connectionFactories = AbstractConnectionFactory.getFactories(factory,
new HttpConnectionFactory())
new HttpConnectionFactory(httpConfig))

def sslConnect(currentPort: Int): (ServerConnector, Int) = {
newConnector(connectionFactories, currentPort)
Expand All @@ -377,7 +379,7 @@ private[spark] object JettyUtils extends Logging {

// Bind the HTTP port.
def httpConnect(currentPort: Int): (ServerConnector, Int) = {
newConnector(Array(new HttpConnectionFactory()), currentPort)
newConnector(Array(new HttpConnectionFactory(httpConfig)), currentPort)
}

val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect,
Expand Down
Loading

0 comments on commit 762e4de

Please sign in to comment.