Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into query-info
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jun 1, 2016
2 parents 9b539cd + a71d136 commit 7df7e9c
Show file tree
Hide file tree
Showing 115 changed files with 1,285 additions and 434 deletions.
6 changes: 5 additions & 1 deletion build/mvn
Expand Up @@ -141,9 +141,13 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
ZINC_JAVA_HOME=
if [ -n "$JAVA_7_HOME" ]; then
ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
$ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
Expand Down
Expand Up @@ -22,7 +22,7 @@
/**
* Base interface for a function used in Dataset's filter function.
*
* If the function returns true, the element is discarded in the returned Dataset.
* If the function returns true, the element is included in the returned Dataset.
*/
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
Expand Down
Expand Up @@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details.
*/
package object function
package object function
Expand Up @@ -23,7 +23,7 @@
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* An memory consumer of TaskMemoryManager, which support spilling.
* A memory consumer of {@link TaskMemoryManager} that supports spilling.
*
* Note: this only supports allocation / spilling of Tungsten memory.
*/
Expand All @@ -45,7 +45,7 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
}

/**
* Returns the memory mode, ON_HEAP or OFF_HEAP.
* Returns the memory mode, {@link MemoryMode#ON_HEAP} or {@link MemoryMode#OFF_HEAP}.
*/
public MemoryMode getMode() {
return mode;
Expand Down
Expand Up @@ -596,6 +596,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)

Expand All @@ -605,6 +606,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
: Accumulator[T] =
sc.accumulator(initialValue, name)(accumulatorParam)
Expand All @@ -613,6 +615,7 @@ class JavaSparkContext(val sc: SparkContext)
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)

Expand All @@ -622,6 +625,7 @@ class JavaSparkContext(val sc: SparkContext)
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
: Accumulable[T, R] =
sc.accumulable(initialValue, name)(param)
Expand Down
Expand Up @@ -211,9 +211,6 @@ private[storage] class BlockInfoManager extends Logging {
* If another task has already locked this block for either reading or writing, then this call
* will block until the other locks are released or will return immediately if `blocking = false`.
*
* If this is called by a task which already holds the block's exclusive write lock, then this
* method will throw an exception.
*
* @param blockId the block to lock.
* @param blocking if true (default), this call will block until the lock is acquired. If false,
* this call will return immediately if the lock acquisition fails.
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Expand Up @@ -120,8 +120,14 @@ class StorageLevel private(
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)

override def toString: String = {
s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
s"deserialized=$deserialized, replication=$replication)"
val disk = if (useDisk) "disk" else ""
val memory = if (useMemory) "memory" else ""
val heap = if (useOffHeap) "offheap" else ""
val deserialize = if (deserialized) "deserialized" else ""

val output =
Seq(disk, memory, heap, deserialize, s"$replication replicas").filter(_.nonEmpty)
s"StorageLevel(${output.mkString(", ")})"
}

override def hashCode(): Int = toInt * 41 + replication
Expand Down
31 changes: 13 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Expand Up @@ -2344,29 +2344,24 @@ private[spark] class RedirectThread(
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)
private var pos: Int = 0
private var isBufferFull = false
private val buffer = new Array[Byte](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
def write(input: Int): Unit = {
buffer(pos) = input.toByte
pos = (pos + 1) % buffer.length
isBufferFull = isBufferFull || (pos == 0)
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
if (!isBufferFull) {
return new String(buffer, 0, pos, StandardCharsets.UTF_8)
}
stringBuilder.toString()

val nonCircularBuffer = new Array[Byte](sizeInBytes)
System.arraycopy(buffer, pos, nonCircularBuffer, 0, buffer.length - pos)
System.arraycopy(buffer, 0, nonCircularBuffer, buffer.length - pos, pos)
new String(nonCircularBuffer, StandardCharsets.UTF_8)
}
}
37 changes: 30 additions & 7 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream}
import java.lang.{Double => JDouble, Float => JFloat}
import java.net.{BindException, ServerSocket, URI}
import java.nio.{ByteBuffer, ByteOrder}
Expand Down Expand Up @@ -681,14 +681,37 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(!Utils.isInDirectory(nullFile, childFile3))
}

test("circular buffer") {
test("circular buffer: if nothing was written to the buffer, display nothing") {
val buffer = new CircularBuffer(4)
assert(buffer.toString === "")
}

test("circular buffer: if the buffer isn't full, print only the contents written") {
val buffer = new CircularBuffer(10)
val stream = new PrintStream(buffer, true, "UTF-8")
stream.print("test")
assert(buffer.toString === "test")
}

test("circular buffer: data written == size of the buffer") {
val buffer = new CircularBuffer(4)
val stream = new PrintStream(buffer, true, "UTF-8")

// fill the buffer to its exact size so that it just hits overflow
stream.print("test")
assert(buffer.toString === "test")

// add more data to the buffer
stream.print("12")
assert(buffer.toString === "st12")
}

test("circular buffer: multiple overflow") {
val buffer = new CircularBuffer(25)
val stream = new java.io.PrintStream(buffer, true, "UTF-8")
val stream = new PrintStream(buffer, true, "UTF-8")

// scalastyle:off println
stream.println("test circular test circular test circular test circular test circular")
// scalastyle:on println
assert(buffer.toString === "t circular test circular\n")
stream.print("test circular test circular test circular test circular test circular")
assert(buffer.toString === "st circular test circular")
}

test("nanSafeCompareDoubles") {
Expand Down
Expand Up @@ -29,13 +29,10 @@ object BroadcastTest {

val blockSize = if (args.length > 2) args(2) else "4096"

val sparkConf = new SparkConf()
.set("spark.broadcast.blockSize", blockSize)

val spark = SparkSession
.builder
.config(sparkConf)
.builder()
.appName("Broadcast Test")
.config("spark.broadcast.blockSize", blockSize)
.getOrCreate()

val sc = spark.sparkContext
Expand Down
Expand Up @@ -191,6 +191,7 @@ object LDAExample {

val spark = SparkSession
.builder
.sparkContext(sc)
.getOrCreate()
import spark.implicits._

Expand Down
Expand Up @@ -22,7 +22,6 @@ import java.io.File

import com.google.common.io.{ByteStreams, Files}

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveFromSpark {
Expand All @@ -35,8 +34,6 @@ object HiveFromSpark {
ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")

// When working with Hive, one must instantiate `SparkSession` with Hive support, including
// connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
// functions. Users who do not have an existing Hive deployment can still enable Hive support.
Expand All @@ -45,7 +42,7 @@ object HiveFromSpark {
// which defaults to the directory `spark-warehouse` in the current directory that the spark
// application is started.
val spark = SparkSession.builder
.config(sparkConf)
.appName("HiveFromSpark")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
Expand Down
1 change: 1 addition & 0 deletions external/java8-tests/pom.xml
Expand Up @@ -106,6 +106,7 @@
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<useZincServer>${useZincForJdk8}</useZincServer>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>1.8</javacArg>
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.clustering

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -696,8 +696,8 @@ class DistributedLDAModel private[ml] (
@DeveloperApi
@Since("2.0.0")
def deleteCheckpointFiles(): Unit = {
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, fs))
val hadoopConf = sparkSession.sparkContext.hadoopConfiguration
_checkpointFiles.foreach(PeriodicCheckpointer.removeCheckpointFile(_, hadoopConf))
_checkpointFiles = Array.empty[String]
}

Expand Down
Expand Up @@ -21,7 +21,7 @@ import java.io.IOException

import scala.collection.mutable

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.ml.tree.{LearningNode, Split}
Expand Down Expand Up @@ -77,8 +77,8 @@ private[spark] class NodeIdCache(
// Indicates whether we can checkpoint
private val canCheckpoint = nodeIdsForInstances.sparkContext.getCheckpointDir.nonEmpty

// FileSystem instance for deleting checkpoints as needed
private val fs = FileSystem.get(nodeIdsForInstances.sparkContext.hadoopConfiguration)
// Hadoop Configuration for deleting checkpoints as needed
private val hadoopConf = nodeIdsForInstances.sparkContext.hadoopConfiguration

/**
* Update the node index values in the cache.
Expand Down Expand Up @@ -130,7 +130,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we'll manually delete it here.
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand All @@ -154,7 +156,9 @@ private[spark] class NodeIdCache(
val old = checkpointQueue.dequeue()
if (old.getCheckpointFile.isDefined) {
try {
fs.delete(new Path(old.getCheckpointFile.get), true)
val path = new Path(old.getCheckpointFile.get)
val fs = path.getFileSystem(hadoopConf)
fs.delete(path, true)
} catch {
case e: IOException =>
logError("Decision Tree learning using cacheNodeIds failed to remove checkpoint" +
Expand Down
Expand Up @@ -1177,7 +1177,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of IndexedRows to Python,
// so return a DataFrame.
val sc = indexedRowMatrix.rows.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(indexedRowMatrix.rows)
}

Expand All @@ -1188,7 +1188,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of MatrixEntry entries to
// Python, so return a DataFrame.
val sc = coordinateMatrix.entries.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(coordinateMatrix.entries)
}

Expand All @@ -1199,7 +1199,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of sub-matrix blocks to
// Python, so return a DataFrame.
val sc = blockMatrix.blocks.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(blockMatrix.blocks)
}
}
Expand Down
Expand Up @@ -437,7 +437,7 @@ class LogisticRegressionWithLBFGS
lr.setMaxIter(optimizer.getNumIterations())
lr.setTol(optimizer.getConvergenceTol())
// Convert our input into a DataFrame
val spark = SparkSession.builder().config(input.context.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(input.context).getOrCreate()
val df = spark.createDataFrame(input.map(_.asML))
// Determine if we should cache the DF
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
Expand Down
Expand Up @@ -193,7 +193,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
modelType: String)

def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -207,7 +207,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {

@Since("1.3.0")
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Expand Down Expand Up @@ -238,7 +238,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
theta: Array[Array[Double]])

def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -251,7 +251,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
}

def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Expand Down

0 comments on commit 7df7e9c

Please sign in to comment.