Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 4, 2014
1 parent e35e101 commit 56fae45
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
5 changes: 3 additions & 2 deletions examples/src/main/python/streaming/test_oprations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
words = lines.flatMap(lambda line: line.split(" "))
# ssc.checkpoint("checkpoint")
mapped_words = words.map(lambda word: (word, 1))
count = mapped_words.reduceByKey(add)

count.pyprint()
ssc.start()
# ssc.awaitTermination()
ssc.stop()
ssc.awaitTermination()
# ssc.stop()
5 changes: 5 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,8 @@ def stop(self, stopSparkContext=True):
finally:
# Stop Callback server
SparkContext._gateway.shutdown()

def checkpoint(self, directory):
"""
"""
self._jssc.checkpoint(directory)
4 changes: 3 additions & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ def add_shuffle_key(split, iterator):
with _JavaStackTrace(self.ctx) as st:
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jdstream = self.ctx._jvm.PairwiseDStream(keyed._jdstream.dstream(), partitioner).asJavaDStream()
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
partitioner).asJavaDStream()
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
Expand Down Expand Up @@ -233,6 +234,7 @@ def takeAndPrint(rdd, time):
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW


class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down
1 change: 0 additions & 1 deletion python/pyspark/streaming/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Java:
implements = ['org.apache.spark.streaming.api.python.PythonRDDFunction']



def msDurationToString(ms):
"""
Returns a human-readable string representing a duration such as "35ms"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.api.python._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.{StreamingContext, Duration, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.api.java._

Expand Down Expand Up @@ -64,7 +64,7 @@ class PythonDStream[T: ClassTag](
}


private class PairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
private class PythonPairwiseDStream(prev:DStream[Array[Byte]], partitioner: Partitioner) extends
DStream[Array[Byte]](prev.ssc){
override def dependencies = List(prev)

Expand Down Expand Up @@ -105,6 +105,7 @@ class PythonForeachDStream(

this.register()
}

/*
This does not work. Ignore this for now. -TD
class PythonTransformedDStream(
Expand All @@ -126,3 +127,30 @@ class PythonTransformedDStream(
}
*/

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
class PythonTestInputStream(ssc_ : StreamingContext, filename: String, numPartitions: Int)
extends InputDStream[Array[Byte]](ssc_) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
//val selectedInput = if (index < input.size) input(index) else Seq[T]()

// lets us test cases where RDDs are not created
//if (filename == null)
// return None

//val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
val rdd = PythonRDD.readRDDFromFile(ssc.sc, filename, numPartitions).rdd
logInfo("Created RDD " + rdd.id + " with " + filename)
Some(rdd)
}
}

0 comments on commit 56fae45

Please sign in to comment.