Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent c00e091 commit 3166d31
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 109 deletions.
9 changes: 5 additions & 4 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,18 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):

def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for test.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
This function is only for unittest.
It requires a sequence as input, and returns the i_th element at the i_th batch
under manual clock.
"""
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)

# All deserializer has to be the same.
# TODO: add deserializer validation
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

Expand Down
28 changes: 11 additions & 17 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,23 +283,6 @@ def func(iterator):
yield list(iterator)
return self.mapPartitions(func)

#def transform(self, func): - TD
# from utils import RDDFunction
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
taken = rdd.collect()
result.append(taken)

self.foreachRDD(get_output)

def cache(self):
"""
Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
Expand Down Expand Up @@ -404,6 +387,17 @@ def saveAsTextFile(rdd, time):

return self.foreachRDD(saveAsTextFile)

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
collected = rdd.collect()
result.append(collected)

self.foreachRDD(get_output)


# TODO: implement updateStateByKey
# TODO: implement slice
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/streaming/jtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
from pyspark.streaming.duration import Duration

"""
The name of this file, time is not good naming for python
The name of this file, time is not a good naming for python
because if we do import time when we want to use native python time package, it does
not import python time package.
"""
# TODO: add doctest


class Time(object):
Expand Down
12 changes: 9 additions & 3 deletions python/pyspark/streaming/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@


class RDDFunction():
"""
This class is for py4j callback. This
"""
def __init__(self, ctx, jrdd_deserializer, func):
self.ctx = ctx
self.deserializer = jrdd_deserializer
Expand All @@ -38,6 +41,7 @@ class Java:


def msDurationToString(ms):
#TODO: add doctest
"""
Returns a human-readable string representing a duration such as "35ms"
"""
Expand All @@ -54,8 +58,10 @@ def msDurationToString(ms):
else:
return "%.2f h" % (float(ms) / hour)


def rddToFileName(prefix, suffix, time):
if suffix is not None:
return prefix + "-" + str(time) + "." + suffix
else:
#TODO: add doctest
if suffix is None:
return prefix + "-" + str(time)
else:
return prefix + "-" + str(time) + "." + suffix
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.streaming.api.python

import java.io._
import java.io.{ObjectInputStream, IOException}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -55,7 +53,9 @@ class PythonDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
parent.getOrCompute(validTime) match{
case Some(rdd) =>
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
// create PythonRDD to compute Python functions.
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes,
preservePartitoning, pythonExec, broadcastVars, accumulator)
Some(pythonRDD.asJavaRDD.rdd)
case None => None
}
Expand Down Expand Up @@ -135,8 +135,8 @@ DStream[Array[Byte]](prev.ssc){
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* Since python operation is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
* Since python function is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pyspark.
*
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
Expand All @@ -154,23 +154,6 @@ DStream[Array[Byte]](prev.ssc){
}


class PythonTestInputStream3(ssc_ : JavaStreamingContext)
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Any]] = {
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

class PythonForeachDStream(
prev: DStream[Array[Byte]],
foreachFunction: PythonRDDFunction
Expand All @@ -184,30 +167,11 @@ class PythonForeachDStream(
this.register()
}

class PythonTransformedDStream(
prev: DStream[Array[Byte]],
transformFunction: PythonRDDFunction
) extends DStream[Array[Byte]](prev.ssc) {

override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
prev.getOrCompute(validTime).map(rdd => {
transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
})
}

val asJavaDStream = JavaDStream.fromDStream(this)
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is inspired by QueStream
*/

class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Time;

/*
* Interface for py4j callback function.
* This function is called by pyspark.streaming.dstream.DStream.foreachRDD .
*/
public interface PythonRDDFunction {
JavaRDD<byte[]> call(JavaRDD<byte[]> rdd, long time);
}

This file was deleted.

0 comments on commit 3166d31

Please sign in to comment.