Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience #3464

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ main entry point for all streaming functionality. We create a local StreamingCon
{% highlight scala %}
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought, lets not remove it from the programming guide. People might see the programmign guide of one version and try it on an older version. I would say, key this, but add a comment not necessary in Spark 1.3+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.


// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
Expand Down Expand Up @@ -98,7 +98,7 @@ each line will be split into multiple words and the stream of words is represent
`words` DStream. Next, we want to count these words.

{% highlight scala %}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 1.3+
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.Properties
import kafka.producer._

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.net.Socket
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.receiver.Receiver

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

/**
* Counts words in new text files created in the given directory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.mqtt._
import org.apache.spark.SparkConf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

object QueueStream {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.google.common.io.Files
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.util.IntParam

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._

// scalastyle:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.examples.streaming

import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import akka.zeromq.Subscribe
import akka.util.ByteString

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.zeromq._

import scala.language.implicitConversions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.examples.streaming.clickstream

import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.examples.streaming.StreamingExamples
// scalastyle:off
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.SparkContext._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
import org.apache.spark.util.random.XORShiftRandom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.collection.Map
import scala.collection.mutable.Queue
import scala.language.implicitConversions
import scala.reflect.ClassTag

import akka.actor.{Props, SupervisorStrategy}
Expand Down Expand Up @@ -523,9 +522,11 @@ object StreamingContext extends Logging {

private[streaming] val DEFAULT_CLEANER_TTL = 3600

implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
new PairDStreamFunctions[K, V](stream)
DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream

/**
Expand Down Expand Up @@ -815,6 +814,6 @@ object JavaPairDStream {

def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.deprecated
import scala.collection.mutable.HashMap
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}

Expand All @@ -48,8 +48,7 @@ import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
* (e.g., DStream[(Int, Int)] through implicit conversions when
* `org.apache.spark.streaming.StreamingContext._` is imported.
* (e.g., DStream[(Int, Int)] through implicit conversions.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
Expand Down Expand Up @@ -802,10 +801,21 @@ abstract class DStream[T: ClassTag] (
}
}

private[streaming] object DStream {
object DStream {

// `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
// `import StreamingContext._` to enable it. Now we move it here to make the compiler find
// it automatically. However, we still keep the old function in StreamingContext for backward
// compatibility and forward to the following function directly.

implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
PairDStreamFunctions[K, V] = {
new PairDStreamFunctions[K, V](stream)
}

/** Get the creation site of a DStream from the stack trace of when the DStream is created. */
def getCreationSite(): CallSite = {
private[streaming] def getCreationSite(): CallSite = {
val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext.rddToFileName

/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
* these functions.
*/
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import org.apache.spark.Partitioner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package org.apache.spark
* available only on DStreams
* of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
* available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
* conversions when you `import org.apache.spark.streaming.StreamingContext._`.
* conversions.
*
* For the Java API of Spark Streaming, take a look at the
* [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
import org.apache.spark.HashPartitioner

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
import org.apache.spark.streaming.StreamingContext._

import scala.util.Random
import scala.collection.mutable.ArrayBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.streaming

import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streamingtest

/**
* A test suite to make sure all `implicit` functions work correctly.
*
* As `implicit` is a compiler feature, we don't need to run this class.
* What we need to do is making the compiler happy.
*/
class ImplicitSuite {

// We only want to test if `implict` works well with the compiler, so we don't need a real DStream.
def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null

def testToPairDStreamFunctions(): Unit = {
val rdd: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = mockDStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this a copy-paste error ;) should be named dstream instead of rdd.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Done.

rdd.groupByKey()
}
}