Skip to content

Commit

Permalink
Change private class to private[package] class to prevent them from s…
Browse files Browse the repository at this point in the history
…howing up in the docs
  • Loading branch information
tdas committed Aug 24, 2015
1 parent 053d94f commit 67f3ee9
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ object FlumeUtils {
* This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's FlumeUtils.
*/
private class FlumeUtilsPythonHelper {
private[flume] class FlumeUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ package org.apache.spark.streaming.kafka
import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represent the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID
* Represents the host and port info for a Kafka broker.
* Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
*/
@Experimental
final class Broker private(
/** Broker's hostname */
val host: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,33 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap}
import java.util.Properties
import java.util.concurrent.TimeoutException
import java.util.{Map => JMap, Properties}

import scala.annotation.tailrec
import scala.language.postfixOps
import scala.util.control.NonFatal

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.StringEncoder
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.{ZKStringSerializer, ZkUtils}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf}

/**
* This is a helper class for Kafka test suites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
*/
private class KafkaTestUtils extends Logging {
private[kafka] class KafkaTestUtils extends Logging {

// Zookeeper related configurations
private val zkHost = "localhost"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,25 @@

package org.apache.spark.streaming.kafka

import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}
import java.util.{Map => JMap}
import java.util.{Set => JSet}
import java.util.{List => JList}
import java.lang.{Integer => JInt, Long => JLong}
import java.util.{List => JList, Map => JMap, Set => JSet}

import scala.reflect.ClassTag
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}

import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairInputDStream, JavaInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.api.java.{JavaInputDStream, JavaPairInputDStream, JavaPairReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{SparkContext, SparkException}

object KafkaUtils {
/**
Expand Down Expand Up @@ -196,7 +192,6 @@ object KafkaUtils {
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
Expand All @@ -214,7 +209,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
* as the metadata.
Expand All @@ -230,7 +224,6 @@ object KafkaUtils {
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[
K: ClassTag,
V: ClassTag,
Expand Down Expand Up @@ -268,7 +261,6 @@ object KafkaUtils {
* @param offsetRanges Each OffsetRange in the batch corresponds to a
* range of offsets for a given Kafka topic/partition
*/
@Experimental
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jsc: JavaSparkContext,
keyClass: Class[K],
Expand All @@ -287,7 +279,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
* specify the Kafka leader to connect to (to optimize fetching) and access the message as well
* as the metadata.
Expand All @@ -303,7 +294,6 @@ object KafkaUtils {
* in which case leaders will be looked up on the driver.
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jsc: JavaSparkContext,
keyClass: Class[K],
Expand All @@ -327,7 +317,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -357,7 +346,6 @@ object KafkaUtils {
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
Expand All @@ -375,7 +363,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -405,7 +392,6 @@ object KafkaUtils {
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
Expand Down Expand Up @@ -437,7 +423,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -472,7 +457,6 @@ object KafkaUtils {
* starting point of the stream
* @param messageHandler Function for translating each message and metadata into the desired type
*/
@Experimental
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
Expand All @@ -499,7 +483,6 @@ object KafkaUtils {
}

/**
* :: Experimental ::
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
Expand Down Expand Up @@ -533,7 +516,6 @@ object KafkaUtils {
* to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
*/
@Experimental
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: JavaStreamingContext,
keyClass: Class[K],
Expand Down Expand Up @@ -564,7 +546,7 @@ object KafkaUtils {
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
* takes care of known parameters instead of passing them from Python
*/
private class KafkaUtilsPythonHelper {
private[kafka] class KafkaUtilsPythonHelper {
def createStream(
jssc: JavaStreamingContext,
kafkaParams: JMap[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represents any object that has a collection of [[OffsetRange]]s. This can be used access the
* offset ranges in RDDs generated by the direct Kafka DStream (see
* [[KafkaUtils.createDirectStream()]]).
Expand All @@ -33,21 +30,18 @@ import org.apache.spark.annotation.Experimental
* }
* }}}
*/
@Experimental
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

/**
* :: Experimental ::
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/
@Experimental
final class OffsetRange private(
val topic: String,
val partition: Int,
Expand Down Expand Up @@ -84,10 +78,8 @@ final class OffsetRange private(
}

/**
* :: Experimental ::
* Companion object the provides methods to create instances of [[OffsetRange]].
*/
@Experimental
object OffsetRange {
def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
new OffsetRange(topic, partition, fromOffset, untilOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import scala.reflect.ClassTag

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

object MQTTUtils {
/**
Expand Down Expand Up @@ -79,7 +79,7 @@ object MQTTUtils {
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
* function so that it can be easily instantiated and called from Python's MQTTUtils.
*/
private class MQTTUtilsPythonHelper {
private[mqtt] class MQTTUtilsPythonHelper {

def createStream(
jssc: JavaStreamingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf}
/**
* Share codes for Scala and Python unit tests
*/
private class MQTTTestUtils extends Logging {
private[mqtt] class MQTTTestUtils extends Logging {

private val persistenceDir = Utils.createTempDir()
private val brokerHost = "localhost"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
private class KinesisTestUtils extends Logging {
private[kinesis] class KinesisTestUtils extends Logging {

val endpointUrl = KinesisTestUtils.endpointUrl
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Iterator;

/**
* :: DeveloperApi ::
*
* This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming
* to save the received data (by receivers) and associated metadata to a reliable storage, so that
* they can be recovered after driver failures. See the Spark documentation for more information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.streaming.util;

/**
* :: DeveloperApi ::
*
* This abstract class represents a handle that refers to a record written in a
* {@link org.apache.spark.streaming.util.WriteAheadLog WriteAheadLog}.
* It must contain all the information necessary for the record to be read and returned by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
/**
* A utility that will wrap the Iterator to get the count
*/
private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
private[streaming] class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
private var _count = 0

private def isFullyConsumed: Boolean = !iterator.hasNext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
* Function to start the receiver on the worker node. Use a class instead of closure to avoid
* the serialization issue.
*/
private class StartReceiverFunc(
private[streaming] class StartReceiverFunc(
checkpointDirOption: Option[String],
serializableHadoopConf: SerializableConfiguration)
extends (Iterator[Receiver[_]] => Unit) with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData

private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])

private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
Expand Down

0 comments on commit 67f3ee9

Please sign in to comment.