Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into AvoidInnerClasses
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffrharr committed May 21, 2015
2 parents adbcb39 + 1ee8eb4 commit 229f543
Show file tree
Hide file tree
Showing 96 changed files with 2,260 additions and 553 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
val kc = clean(kcf)()
val vc = clean(vcf)()
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
Expand Down Expand Up @@ -125,7 +126,7 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))

attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.deploy.master.ui

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationsListResource, ApplicationInfo,
UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.RpcUtils
Expand Down Expand Up @@ -47,7 +48,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
*/
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
val cleanedF = self.sparkContext.clean(func)

if (keyClass.isArray) {
throw new SparkException("reduceByKeyLocally() does not support array keys")
Expand All @@ -305,15 +306,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val map = new JHashMap[K, V]
iter.foreach { pair =>
val old = map.get(pair._1)
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
Iterator(map)
} : Iterator[JHashMap[K, V]]

val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { pair =>
val old = m1.get(pair._1)
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
}
m1
} : JHashMap[K, V]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal

import org.apache.spark.Logging

private[serializer] object SerializationDebugger extends Logging {
private[spark] object SerializationDebugger extends Logging {

/**
* Improve the given NotSerializableException with the serialization path leading from the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI
* HistoryServerSuite.
*/
@Path("/v1")
private[v1] class JsonRootResource extends UIRootFromServletContext {
private[v1] class ApiRootResource extends UIRootFromServletContext {

@Path("applications")
def getApplicationList(): ApplicationListResource = {
Expand Down Expand Up @@ -166,11 +166,11 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {

}

private[spark] object JsonRootResource {
private[spark] object ApiRootResource {

def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
jerseyContext.setContextPath("/json")
jerseyContext.setContextPath("/api")
val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
Expand Down
36 changes: 24 additions & 12 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream, OutputStream}
import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand Down Expand Up @@ -489,16 +489,17 @@ private[spark] class BlockManager(
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
externalBlockStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
} else {
return Some(new BlockResult(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in externalBlockStore")
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
Expand Down Expand Up @@ -1206,8 +1207,19 @@ private[spark] class BlockManager(
bytes: ByteBuffer,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
bytes.rewind()
val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true))
serializer.newInstance().deserializeStream(stream).asIterator
dataDeserializeStream(blockId, new ByteBufferInputStream(bytes, true), serializer)
}

/**
* Deserializes a InputStream into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserializeStream(
blockId: BlockId,
inputStream: InputStream,
serializer: Serializer = defaultSerializer): Iterator[Any] = {
val stream = new BufferedInputStream(inputStream)
serializer.newInstance().deserializeStream(wrapForCompression(blockId, stream)).asIterator
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import java.nio.ByteBuffer
*/
private[spark] abstract class ExternalBlockManager {

protected var blockManager: BlockManager = _

override def toString: String = {"External Block Store"}

/**
Expand All @@ -41,7 +43,9 @@ private[spark] abstract class ExternalBlockManager {
*
* @throws java.io.IOException if there is any file system failure during the initialization.
*/
def init(blockManager: BlockManager, executorId: String): Unit
def init(blockManager: BlockManager, executorId: String): Unit = {
this.blockManager = blockManager
}

/**
* Drop the block from underlying external block store, if it exists..
Expand Down Expand Up @@ -73,6 +77,11 @@ private[spark] abstract class ExternalBlockManager {
*/
def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit

def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val bytes = blockManager.dataSerialize(blockId, values)
putBytes(blockId, bytes)
}

/**
* Retrieve the block bytes.
* @return Some(ByteBuffer) if the block bytes is successfully retrieved
Expand All @@ -82,6 +91,17 @@ private[spark] abstract class ExternalBlockManager {
*/
def getBytes(blockId: BlockId): Option[ByteBuffer]

/**
* Retrieve the block data.
* @return Some(Iterator[Any]) if the block data is successfully retrieved
* None if the block does not exist in the external block store.
*
* @throws java.io.IOException if there is any file system failure in getting the block.
*/
def getValues(blockId: BlockId): Option[Iterator[_]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
}

/**
* Get the size of the block saved in the underlying external block store,
* which is saved before by putBytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.spark.storage

import java.nio.ByteBuffer

import scala.util.control.NonFatal

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import scala.util.control.NonFatal


/**
Expand All @@ -40,7 +42,7 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.getSize(blockId)).getOrElse(0)
} catch {
case NonFatal(t) =>
logError(s"error in getSize from $blockId", t)
logError(s"Error in getSize($blockId)", t)
0L
}
}
Expand All @@ -54,50 +56,78 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
}

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
logDebug(s"Attempting to write values for block $blockId")
val bytes = blockManager.dataSerialize(blockId, values)
putIntoExternalBlockStore(blockId, bytes, returnValues)
putIntoExternalBlockStore(blockId, values, returnValues)
}

private def putIntoExternalBlockStore(
blockId: BlockId,
bytes: ByteBuffer,
values: Iterator[_],
returnValues: Boolean): PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
logDebug(s"Attempting to put block $blockId into ExtBlk store")
logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
externalBlockManager.get.putBytes(blockId, bytes)
externalBlockManager.get.putValues(blockId, values)
val size = getSize(blockId)
val data = if (returnValues) {
Left(getValues(blockId).get)
} else {
null
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
blockId, Utils.bytesToString(byteBuffer.limit), finishTime - startTime))
blockId, Utils.bytesToString(size), finishTime - startTime))
PutResult(size, data)
} else {
logError(s"Error in putValues($blockId): no ExternalBlockManager has been configured")
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
logError(s"Error in putValues($blockId)", t)
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}

if (returnValues) {
PutResult(bytes.limit(), Right(bytes.duplicate()))
private def putIntoExternalBlockStore(
blockId: BlockId,
bytes: ByteBuffer,
returnValues: Boolean): PutResult = {
logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
val startTime = System.currentTimeMillis
if (externalBlockManager.isDefined) {
val byteBuffer = bytes.duplicate()
byteBuffer.rewind()
externalBlockManager.get.putBytes(blockId, byteBuffer)
val size = bytes.limit()
val data = if (returnValues) {
Right(bytes)
} else {
PutResult(bytes.limit(), null)
null
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file in ExternalBlockStore in %d ms".format(
blockId, Utils.bytesToString(size), finishTime - startTime))
PutResult(size, data)
} else {
logError(s"error in putBytes $blockId")
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
logError(s"Error in putBytes($blockId): no ExternalBlockManager has been configured")
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
} catch {
case NonFatal(t) =>
logError(s"error in putBytes $blockId", t)
PutResult(bytes.limit(), null, Seq((blockId, BlockStatus.empty)))
logError(s"Error in putBytes($blockId)", t)
PutResult(-1, null, Seq((blockId, BlockStatus.empty)))
}
}

Expand All @@ -107,21 +137,27 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
externalBlockManager.map(_.removeBlock(blockId)).getOrElse(true)
} catch {
case NonFatal(t) =>
logError(s"error in removing $blockId", t)
logError(s"Error in removeBlock($blockId)", t)
true
}
}

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, buffer))
try {
externalBlockManager.flatMap(_.getValues(blockId))
} catch {
case NonFatal(t) =>
logError(s"Error in getValues($blockId)", t)
None
}
}

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
try {
externalBlockManager.flatMap(_.getBytes(blockId))
} catch {
case NonFatal(t) =>
logError(s"error in getBytes from $blockId", t)
logError(s"Error in getBytes($blockId)", t)
None
}
}
Expand All @@ -130,13 +166,13 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
try {
val ret = externalBlockManager.map(_.blockExists(blockId)).getOrElse(false)
if (!ret) {
logInfo(s"remove block $blockId")
logInfo(s"Remove block $blockId")
blockManager.removeBlock(blockId, true)
}
ret
} catch {
case NonFatal(t) =>
logError(s"error in getBytes from $blockId", t)
logError(s"Error in getBytes($blockId)", t)
false
}
}
Expand Down
Loading

0 comments on commit 229f543

Please sign in to comment.