Skip to content

Commit

Permalink
Merge remote-tracking branch 'spark/master' into SPARK-12362
Browse files Browse the repository at this point in the history
  • Loading branch information
hvanhovell committed Dec 29, 2015
2 parents cb60ba0 + d80cc90 commit feee2ba
Show file tree
Hide file tree
Showing 168 changed files with 2,679 additions and 742 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,4 @@ gen-java.*
org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
LZ4BlockInputStream.java
4 changes: 2 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ setMethod("%in%",

#' otherwise
#'
#' If values in the specified column are null, returns the value.
#' If values in the specified column are null, returns the value.
#' Can be used in conjunction with `when` to specify a default value for expressions.
#'
#' @rdname otherwise
Expand All @@ -225,7 +225,7 @@ setMethod("%in%",
setMethod("otherwise",
signature(x = "Column", value = "ANY"),
function(x, value) {
value <- ifelse(class(value) == "Column", value@jc, value)
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJMethod(x@jc, "otherwise", value)
column(jc)
})
13 changes: 8 additions & 5 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ setMethod("lit", signature("ANY"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions",
"lit",
ifelse(class(x) == "Column", x@jc, x))
if (class(x) == "Column") { x@jc } else { x })
column(jc)
})

Expand Down Expand Up @@ -2262,7 +2262,7 @@ setMethod("unix_timestamp", signature(x = "Column", format = "character"),
setMethod("when", signature(condition = "Column", value = "ANY"),
function(condition, value) {
condition <- condition@jc
value <- ifelse(class(value) == "Column", value@jc, value)
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJStatic("org.apache.spark.sql.functions", "when", condition, value)
column(jc)
})
Expand All @@ -2277,13 +2277,16 @@ setMethod("when", signature(condition = "Column", value = "ANY"),
#' @name ifelse
#' @seealso \link{when}
#' @export
#' @examples \dontrun{ifelse(df$a > 1 & df$b > 2, 0, 1)}
#' @examples \dontrun{
#' ifelse(df$a > 1 & df$b > 2, 0, 1)
#' ifelse(df$a > 1, df$a, 1)
#' }
setMethod("ifelse",
signature(test = "Column", yes = "ANY", no = "ANY"),
function(test, yes, no) {
test <- test@jc
yes <- ifelse(class(yes) == "Column", yes@jc, yes)
no <- ifelse(class(no) == "Column", no@jc, no)
yes <- if (class(yes) == "Column") { yes@jc } else { yes }
no <- if (class(no) == "Column") { no@jc } else { no }
jc <- callJMethod(callJStatic("org.apache.spark.sql.functions",
"when",
test, yes),
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,14 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
})

test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(select(df, when(df$a > 1 & df$b > 2, lit(1))))[, 1], c(NA, 1))
expect_equal(collect(select(df, otherwise(when(df$a > 1, lit(1)), lit(0))))[, 1], c(0, 1))
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
})

test_that("group by, agg functions", {
df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
Expand Down
6 changes: 3 additions & 3 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ install_mvn() {

# Install zinc under the build/ folder
install_zinc() {
local zinc_path="zinc-0.3.5.3/bin/zinc"
local zinc_path="zinc-0.3.9/bin/zinc"
[ ! -f "${_DIR}/${zinc_path}" ] && ZINC_INSTALL_FLAG=1
install_app \
"http://downloads.typesafe.com/zinc/0.3.5.3" \
"zinc-0.3.5.3.tgz" \
"http://downloads.typesafe.com/zinc/0.3.9" \
"zinc-0.3.9.tgz" \
"${zinc_path}"
ZINC_BIN="${_DIR}/${zinc_path}"
}
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,13 @@ a.expandbutton {
background-color: #49535a !important;
color: white;
cursor:pointer;
}
}

th a, th a:hover {
/* Make the entire header clickable, not just the text label */
display: block;
width: 100%;
/* Suppress the default link styling */
color: #333;
text-decoration: none;
}
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
minPartitions).map(pair => pair._2.toString).setName(path)
}

/**
Expand Down Expand Up @@ -885,7 +885,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[Text],
classOf[Text],
updateConf,
minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}

/**
Expand Down Expand Up @@ -1248,7 +1248,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
def emptyRDD[T: ClassTag]: RDD[T] = new EmptyRDD[T](this)

// Methods for creating shared variables

Expand Down Expand Up @@ -2073,8 +2073,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// its own local file system, which is incorrect because the checkpoint files
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Checkpoint directory must be non-local " +
"if Spark is running on a cluster: " + directory)
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}

checkpointDir = Option(directory).map { dir =>
Expand All @@ -2095,7 +2096,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/** Default min number of partitions for Hadoop RDDs when not given by user */
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
def defaultMinSplits: Int = defaultMinPartitions

/**
* Default min number of partitions for Hadoop RDDs when not given by user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._

import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslServerBootstrap
import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer}
import org.apache.spark.network.server.{TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.util.TransportConf
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand Down Expand Up @@ -108,6 +108,7 @@ object ExternalShuffleService extends Logging {
private[spark] def main(
args: Array[String],
newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = {
Utils.initDaemon(log)
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
val securityManager = new SecurityManager(sparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[spark] class AppClient(
*/
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
Utils.tryOrExit {
if (registered.get) {
Expand All @@ -138,7 +138,7 @@ private[spark] class AppClient(
}
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,16 +663,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// For testing.
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
val actionClass: Class[_] =
try {
getClass().getClassLoader().loadClass(hadoop2Class)
} catch {
case _: ClassNotFoundException =>
getClass().getClassLoader().loadClass(hadoop1Class)
}

val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class)
val action = actionClass.getField("SAFEMODE_GET").get(null)
val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
method.invoke(dfs, action).asInstanceOf[Boolean]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

import com.google.common.cache._
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -223,7 +221,7 @@ object HistoryServer extends Logging {
val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
val securityManager = new SecurityManager(conf)
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, Utils}

private[deploy] class Master(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -979,7 +979,11 @@ private[deploy] class Master(

futureUI.onSuccess { case Some(ui) =>
appIdToUI.put(app.id, ui)
self.send(AttachCompletedRebuildUI(app.id))
// `self` can be null if we are already in the process of shutting down
// This happens frequently in tests where `local-cluster` is used
if (self != null) {
self.send(AttachCompletedRebuildUI(app.id))
}
// Application UI is successfully rebuilt, so link the Master UI to it
// NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
Expand Down Expand Up @@ -1083,7 +1087,7 @@ private[deploy] object Master extends Logging {
val ENDPOINT_NAME = "Master"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.{ShutdownHookManager, SignalLogger}
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/*
Expand Down Expand Up @@ -92,7 +92,7 @@ private[mesos] class MesosClusterDispatcher(

private[mesos] object MesosClusterDispatcher extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
conf.setMaster(dispatcherArgs.masterUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"

def main(argStrings: Array[String]) {
SignalLogger.register(log)
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration

import scala.collection.mutable
import scala.util.{Failure, Success}

import org.apache.spark.rpc._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand All @@ -33,7 +30,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -146,7 +143,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
workerUrl: Option[String],
userClassPath: Seq[URL]) {

SignalLogger.register(log)
Utils.initDaemon(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Expand Down Expand Up @@ -257,7 +254,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// scalastyle:off println
System.err.println(
"""
|"Usage: CoarseGrainedExecutorBackend [options]
|Usage: CoarseGrainedExecutorBackend [options]
|
| Options are:
| --driver-url <driverUrl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv}
import org.apache.spark.{Logging, SparkConf, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.Utils

private[spark] class MesosExecutorBackend
extends MesosExecutor
Expand Down Expand Up @@ -121,7 +121,7 @@ private[spark] class MesosExecutorBackend
*/
private[spark] object MesosExecutorBackend extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
Utils.initDaemon(log)
// Create a new Executor and start it running
val runner = new MesosExecutorBackend()
new MesosExecutorDriver(runner).run()
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.io

import java.io.{IOException, InputStream, OutputStream}
import java.io._

import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import net.jpountz.lz4.LZ4BlockOutputStream
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -49,7 +49,8 @@ private[spark] object CompressionCodec {
private val configKey = "spark.io.compression.codec"

private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
|| codec.isInstanceOf[LZ4CompressionCodec])
}

private val shortCompressionCodecNames = Map(
Expand Down Expand Up @@ -92,12 +93,11 @@ private[spark] object CompressionCodec {
}
}

val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val FALLBACK_COMPRESSION_CODEC = "snappy"
val DEFAULT_COMPRESSION_CODEC = "lz4"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}


/**
* :: DeveloperApi ::
* LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
Expand Down

0 comments on commit feee2ba

Please sign in to comment.