Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into cleanupTests
Browse files Browse the repository at this point in the history
Conflicts:
	sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
  • Loading branch information
marmbrus committed Aug 24, 2015
2 parents f3b5d3a + a2f4cdc commit 71fe053
Show file tree
Hide file tree
Showing 27 changed files with 320 additions and 143 deletions.
6 changes: 2 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// if we find that it's okay.
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)

private val displayPeakExecutionMemory =
parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)

def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
Expand Down Expand Up @@ -1193,8 +1192,7 @@ private[ui] class TaskPagedTable(
desc: Boolean) extends PagedTable[TaskTableRowData] {

// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory =
conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)

override def tableId: String = "task-table"

Expand Down
8 changes: 6 additions & 2 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

test("peak execution memory only displayed if unsafe is enabled") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf().set(unsafeConf, "true")
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
val targetString = "peak execution memory"
assert(html.contains(targetString))
// Disable unsafe and make sure it's not there
val conf2 = new SparkConf().set(unsafeConf, "false")
val conf2 = new SparkConf(false).set(unsafeConf, "false")
val html2 = renderStagePage(conf2).toString().toLowerCase
assert(!html2.contains(targetString))
// Avoid setting anything; it should be displayed by default
val conf3 = new SparkConf(false)
val html3 = renderStagePage(conf3).toString().toLowerCase
assert(html3.contains(targetString))
}

/**
Expand Down
1 change: 0 additions & 1 deletion external/flume-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-flume-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ object FlumeUtils {
* This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's FlumeUtils.
*/
private class FlumeUtilsPythonHelper {
private[flume] class FlumeUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
Expand Down
1 change: 0 additions & 1 deletion external/kafka-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kafka-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka
import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represent the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
* Represents the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
*/
@Experimental
final class Broker private(
/** Broker's hostname */
val host: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,33 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap}
import java.util.Properties
import java.util.concurrent.TimeoutException
import java.util.{Map => JMap, Properties}

import scala.annotation.tailrec
import scala.language.postfixOps
import scala.util.control.NonFatal

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf}

/**
* This is a helper class for Kafka test suites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
private class KafkaTestUtils extends Logging {
private[kafka] class KafkaTestUtils extends Logging {

// Zookeeper related configurations
private val zkHost = "localhost"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,25 @@

package org.apache.spark.streaming.kafka

import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}
import java.util.{Map => JMap}
import java.util.{Set => JSet}
import java.util.{List => JList}
import java.lang.{Integer => JInt, Long => JLong}
import java.util.{List => JList, Map => JMap, Set => JSet}

import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.api.java.{JavaInputDStream, JavaPairInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}

object KafkaUtils {
/**
Expand Down Expand Up @@ -196,7 +192,6 @@ object KafkaUtils {
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
Expand All @@ -214,7 +209,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
* as the metadata.
Expand All @@ -230,7 +224,6 @@ object KafkaUtils {
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
Expand Down Expand Up @@ -268,7 +261,6 @@ object KafkaUtils {
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@Experimental
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jsc: JavaSparkContext,
keyClass: Class[K],
Expand All @@ -287,7 +279,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
* as the metadata.
Expand All @@ -303,7 +294,6 @@ object KafkaUtils {
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jsc: JavaSparkContext,
keyClass: Class[K],
Expand All @@ -327,7 +317,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -357,7 +346,6 @@ object KafkaUtils {
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
Expand All @@ -375,7 +363,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -405,7 +392,6 @@ object KafkaUtils {
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
Expand Down Expand Up @@ -437,7 +423,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -472,7 +457,6 @@ object KafkaUtils {
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
Expand All @@ -499,7 +483,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -533,7 +516,6 @@ object KafkaUtils {
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: JavaStreamingContext,
keyClass: Class[K],
Expand Down Expand Up @@ -564,7 +546,7 @@ object KafkaUtils {
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
* takes care of known parameters instead of passing them from Python
*/
private class KafkaUtilsPythonHelper {
private[kafka] class KafkaUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
* offset ranges in RDDs generated by the direct Kafka DStream (see
* [[KafkaUtils.createDirectStream()]]).
Expand All @@ -33,21 +30,18 @@ import org.apache.spark.annotation.Experimental
* }
* }}}
*/
@Experimental
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

/**
* :: Experimental ::
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
@Experimental
final class OffsetRange private(
val topic: String,
val partition: Int,
Expand Down Expand Up @@ -84,10 +78,8 @@ final class OffsetRange private(
}

/**
* :: Experimental ::
* Companion object the provides methods to create instances of [[OffsetRange]].
*/
@Experimental
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
Expand Down
1 change: 0 additions & 1 deletion external/mqtt-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.reflect.ClassTag

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object MQTTUtils {
/**
Expand Down Expand Up @@ -79,7 +79,7 @@ object MQTTUtils {
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's MQTTUtils.
*/
private class MQTTUtilsPythonHelper {
private[mqtt] class MQTTUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf}
/**
* Share codes for Scala and Python unit tests
*/
private class MQTTTestUtils extends Logging {
private[mqtt] class MQTTTestUtils extends Logging {

private val persistenceDir = Utils.createTempDir()
private val brokerHost = "localhost"
Expand Down
1 change: 0 additions & 1 deletion extras/kinesis-asl-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-kinesis-asl-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
private class KinesisTestUtils extends Logging {
private[kinesis] class KinesisTestUtils extends Logging {

val endpointUrl = KinesisTestUtils.endpointUrl
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
Expand Down
Loading

0 comments on commit 71fe053

Please sign in to comment.