Skip to content

Commit

Permalink
Merge branch 'master' into redesign
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Aug 17, 2018
2 parents 2018981 + 162326c commit 76f8e6b
Show file tree
Hide file tree
Showing 91 changed files with 2,748 additions and 533 deletions.
2 changes: 0 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ org.apache.xbean:xbean-asm5-shaded
com.squareup.okhttp3:logging-interceptor
com.squareup.okhttp3:okhttp
com.squareup.okio:okio
net.java.dev.jets3t:jets3t
org.apache.spark:spark-catalyst_2.11
org.apache.spark:spark-kvstore_2.11
org.apache.spark:spark-launcher_2.11
Expand Down Expand Up @@ -447,7 +446,6 @@ org.slf4j:jul-to-slf4j
org.slf4j:slf4j-api
org.slf4j:slf4j-log4j12
com.github.scopt:scopt_2.11
org.bouncycastle:bcprov-jdk15on

core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js
core/src/main/resources/org/apache/spark/ui/static/*dataTables*
Expand Down
2 changes: 0 additions & 2 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,3 @@ The following provides more details on the included cryptographic software:
This software uses Apache Commons Crypto (https://commons.apache.org/proper/commons-crypto/) to
support authentication, and encryption and decryption of data sent across the network between
services.

This software includes Bouncy Castle (http://bouncycastle.org/) to support the jets3t library.
21 changes: 0 additions & 21 deletions NOTICE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ This software uses Apache Commons Crypto (https://commons.apache.org/proper/comm
support authentication, and encryption and decryption of data sent across the network between
services.

This software includes Bouncy Castle (http://bouncycastle.org/) to support the jets3t library.


// ------------------------------------------------------------------
// NOTICE file corresponding to the section 4d of The Apache License,
Expand Down Expand Up @@ -1162,25 +1160,6 @@ NonlinearMinimizer class in package breeze.optimize.proximal is distributed with
2015, Debasish Das (Verizon), all rights reserved.


=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the distribution of jets3t. ==
=========================================================================

This product includes software developed by:

The Apache Software Foundation (http://www.apache.org/).

The ExoLab Project (http://www.exolab.org/)

Sun Microsystems (http://www.sun.com/)

Codehaus (http://castor.codehaus.org)

Tatu Saloranta (http://wiki.fasterxml.com/TatuSaloranta)



stream-lib
Copyright 2016 AddThis

Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ exportMethods("arrange",
"dropna",
"dtypes",
"except",
"exceptAll",
"explain",
"fillna",
"filter",
Expand All @@ -131,6 +132,7 @@ exportMethods("arrange",
"hint",
"insertInto",
"intersect",
"intersectAll",
"isLocal",
"isStreaming",
"join",
Expand Down
59 changes: 58 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,35 @@ setMethod("intersect",
dataFrame(intersected)
})

#' intersectAll
#'
#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
#' and another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the intersect all operation.
#' @family SparkDataFrame functions
#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
#' @rdname intersectAll
#' @name intersectAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' intersectAllDF <- intersectAll(df1, df2)
#' }
#' @note intersectAll since 2.4.0
setMethod("intersectAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
dataFrame(intersected)
})

#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
Expand All @@ -2867,7 +2896,6 @@ setMethod("intersect",
#' df2 <- read.json(path2)
#' exceptDF <- except(df, df2)
#' }
#' @rdname except
#' @note except since 1.4.0
setMethod("except",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
Expand All @@ -2876,6 +2904,35 @@ setMethod("except",
dataFrame(excepted)
})

#' exceptAll
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the except all operation.
#' @family SparkDataFrame functions
#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
#' @rdname exceptAll
#' @name exceptAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' exceptAllDF <- exceptAll(df1, df2)
#' }
#' @note exceptAll since 2.4.0
setMethod("exceptAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
dataFrame(excepted)
})

#' Save the contents of SparkDataFrame to a data source.
#'
#' The data source is specified by the \code{source} and a set of options (...).
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname exceptAll
setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })

#' @rdname nafunctions
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

Expand All @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn
#' @rdname intersect
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })

#' @rdname intersectAll
setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })

#' @rdname isLocal
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })

Expand Down
19 changes: 19 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
unlink(jsonPath2)
})

test_that("intersectAll() and exceptAll()", {
df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1),
list("a", 1), list("b", 3), list("c", 4)),
schema = c("a", "b"))
df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b"))
intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
stringsAsFactors = FALSE)
exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
stringsAsFactors = FALSE)
intersectAllDf <- arrange(intersectAll(df1, df2), df1$a)
expect_is(intersectAllDf, "SparkDataFrame")
exceptAllDf <- arrange(exceptAll(df1, df2), df1$a)
expect_is(exceptAllDf, "SparkDataFrame")
intersectAllActual <- collect(intersectAllDf)
expect_identical(intersectAllActual, intersectAllExpected)
exceptAllActual <- collect(exceptAllDf)
expect_identical(exceptAllActual, exceptAllExpected)
})

test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept
// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
int written = target.write(buffer);
// If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...)
// to eliminate extra memory copies.
int written = 0;
if (buf.nioBufferCount() == 1) {
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
written = target.write(buffer);
} else {
ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
for (ByteBuffer buffer: buffers) {
int remaining = buffer.remaining();
int w = target.write(buffer);
written += w;
if (w < remaining) {
// Could not write all, we need to break now.
break;
}
}
}
buf.skipBytes(written);
return written;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.channels.WritableByteChannel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.spark.network.util.AbstractFileRegion;
import org.junit.Test;
Expand All @@ -48,7 +49,36 @@ public void testShortWrite() throws Exception {

@Test
public void testByteBufBody() throws Exception {
testByteBufBody(Unpooled.copyLong(42));
}

@Test
public void testCompositeByteBufBodySingleBuffer() throws Exception {
ByteBuf header = Unpooled.copyLong(42);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, header);
assertEquals(1, compositeByteBuf.nioBufferCount());
testByteBufBody(compositeByteBuf);
}

@Test
public void testCompositeByteBufBodyMultipleBuffers() throws Exception {
ByteBuf header = Unpooled.copyLong(42);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(true, header.retainedSlice(0, 4));
compositeByteBuf.addComponent(true, header.slice(4, 4));
assertEquals(2, compositeByteBuf.nioBufferCount());
testByteBufBody(compositeByteBuf);
}

/**
* Test writing a {@link MessageWithHeader} using the given {@link ByteBuf} as header.
*
* @param header the header to use.
* @throws Exception thrown on error.
*/
private void testByteBufBody(ByteBuf header) throws Exception {
long expectedHeaderValue = header.getLong(header.readerIndex());
ByteBuf bodyPassedToNettyManagedBuffer = Unpooled.copyLong(84);
assertEquals(1, header.refCnt());
assertEquals(1, bodyPassedToNettyManagedBuffer.refCnt());
Expand All @@ -61,7 +91,7 @@ public void testByteBufBody() throws Exception {
MessageWithHeader msg = new MessageWithHeader(managedBuf, header, body, managedBuf.size());
ByteBuf result = doWrite(msg, 1);
assertEquals(msg.count(), result.readableBytes());
assertEquals(42, result.readLong());
assertEquals(expectedHeaderValue, result.readLong());
assertEquals(84, result.readLong());

assertTrue(msg.release());
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Get the max number of tasks that can be concurrent launched currently.
* Note that please don't cache the value returned by this method, because the number can change
* due to add/remove executors.
*
* @return The max number of tasks that can be concurrent launched currently.
*/
private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks()

/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var driverCores: String = null
var submissionToKill: String = null
var submissionToRequestStatusFor: String = null
var useRest: Boolean = true // used internally
var useRest: Boolean = false // used internally

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
Expand Down Expand Up @@ -115,6 +115,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Use `sparkProperties` map along with env vars to fill in any missing parameters
loadEnvironmentArguments()

useRest = sparkProperties.getOrElse("spark.master.rest.enabled", "false").toBoolean

validateArguments()

/**
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,18 @@ private[deploy] class Master(
}

// Alternative application submission gateway that is stable across Spark versions
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", false)
private var restServer: Option[StandaloneRestServer] = None
private var restServerBoundPort: Option[Int] = None

{
val authKey = SecurityManager.SPARK_AUTH_SECRET_CONF
require(conf.getOption(authKey).isEmpty || !restServerEnabled,
s"The RestSubmissionServer does not support authentication via ${authKey}. Either turn " +
"off the RestSubmissionServer with spark.master.rest.enabled=false, or do not use " +
"authentication.")
}

override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ private[spark] abstract class RestSubmissionServer(
val host: String,
val requestedPort: Int,
val masterConf: SparkConf) extends Logging {

protected val submitRequestServlet: SubmitRequestServlet
protected val killRequestServlet: KillRequestServlet
protected val statusRequestServlet: StatusRequestServlet
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,31 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.checkValue(v => v > 0, "The value should be a positive time value.")
.createWithDefaultString("365d")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
.doc("Time in seconds to wait between a max concurrent tasks check failure and the next " +
"check. A max concurrent tasks check ensures the cluster can launch more concurrent " +
"tasks than required by a barrier stage on job submitted. The check can fail in case " +
"a cluster has just started and not enough executors have registered, so we wait for a " +
"little while and try to perform the check again. If the check fails more than a " +
"configured max failure times for a job then fail current job submission. Note this " +
"config only applies to jobs that contain one or more barrier stages, we won't perform " +
"the check on non-barrier jobs.")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("15s")

private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES =
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures")
.doc("Number of max concurrent tasks check failures allowed before fail a job submission. " +
"A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " +
"required by a barrier stage on job submitted. The check can fail in case a cluster " +
"has just started and not enough executors have registered, so we wait for a little " +
"while and try to perform the check again. If the check fails more than a configured " +
"max failure times for a job then fail current job submission. Note this config only " +
"applies to jobs that contain one or more barrier stages, we won't perform the check on " +
"non-barrier jobs.")
.intConf
.checkValue(v => v > 0, "The max failures should be a positive value.")
.createWithDefault(40)
}

0 comments on commit 76f8e6b

Please sign in to comment.