Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import org.apache.spark.util.Utils
private[spark] class ParallelCollectionPartition[T: ClassTag](
var rddId: Long,
var slice: Int,
var values: Seq[T])
extends Partition with Serializable {
var values: Seq[T]
) extends Partition with Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a typo, I'd revert it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't extends Partition with Serializable { should be 2-indent instead of 4-indent?
I see many of similar style where 2-indent is used https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L63

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see your point, it is really about the indentation, and that's why the paren is wrapped. OK.


def iterator: Iterator[T] = values.iterator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

/**
* Custom Receiver that receives data over a socket. Received bytes is interpreted as
* Custom Receiver that receives data over a socket. Received bytes are interpreted as
* text and \n delimited lines are considered as records. They are then counted and printed.
*
* To run this on your local machine, you need to first run a Netcat server
Expand All @@ -50,7 +50,7 @@ object CustomReceiver {
val sparkConf = new SparkConf().setAppName("CustomReceiver")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a input stream with the custom receiver on target ip:port and count the
// Create an input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val words = lines.flatMap(_.split(" "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object QueueStream {
reducedStream.print()
ssc.start()

// Create and push some RDDs into
// Create and push some RDDs into rddQueue
for (i <- 1 to 30) {
rddQueue.synchronized {
rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import org.apache.spark.util.{CallSite, Utils}
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions.
*
* DStreams internally is characterized by a few basic properties:
* A DStream internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ private[streaming] class BlockGenerator(
/**
* The BlockGenerator can be in 5 possible states, in the order as follows.
*
* - Initialized: Nothing has been started
* - Initialized: Nothing has been started.
* - Active: start() has been called, and it is generating blocks on added data.
* - StoppedAddingData: stop() has been called, the adding of data has been stopped,
* but blocks are still being generated and pushed.
* - StoppedGeneratingBlocks: Generating of blocks has been stopped, but
* they are still being pushed.
* - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.
* - StoppedAll: Everything has been stopped, and the BlockGenerator object can be GCed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a typo, but does make the voice consistent. OK

*/
private object GeneratorState extends Enumeration {
type GeneratorState = Value
Expand Down Expand Up @@ -148,7 +148,7 @@ private[streaming] class BlockGenerator(
blockIntervalTimer.stop(interruptTimer = false)
synchronized { state = StoppedGeneratingBlocks }

// Wait for the queue to drain and mark generated as stopped
// Wait for the queue to drain and mark state as StoppedAll
logInfo("Waiting for block pushing thread to terminate")
blockPushingThread.join()
synchronized { state = StoppedAll }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.storage.StorageLevel
* should define the setup steps necessary to start receiving data,
* and `onStop()` should define the cleanup steps necessary to stop receiving data.
* Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
* or stopped completely by `stop(...)` or
* or stopped completely by `stop(...)`.
*
* A custom receiver in Scala would look like this.
*
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.storage.StorageLevel
* // Call store(...) in those threads to store received data into Spark's memory.
*
* // Call stop(...), restart(...) or reportError(...) on any thread based on how
* // different errors needs to be handled.
* // different errors need to be handled.
*
* // See corresponding method documentation for more details
* }
Expand All @@ -71,7 +71,7 @@ import org.apache.spark.storage.StorageLevel
* // Call store(...) in those threads to store received data into Spark's memory.
*
* // Call stop(...), restart(...) or reportError(...) on any thread based on how
* // different errors needs to be handled.
* // different errors need to be handled.
*
* // See corresponding method documentation for more details
* }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.scheduler

import scala.util.{Failure, Success, Try}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
Expand Down Expand Up @@ -239,7 +238,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Restarted JobGenerator at " + restartTime)
}

/** Generate jobs and perform checkpoint for the given `time`. */
/** Generate jobs and perform checkpointing for the given `time`. */
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.streaming.scheduler
import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.mutable.HashMap
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.ExecutionContext
import scala.language.existentials
import scala.util.{Failure, Success}

Expand Down