Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 20, 2014
1 parent 4dedd2d commit 171edeb
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 99 deletions.
10 changes: 5 additions & 5 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

import sys
import time
from signal import signal, SIGTERM, SIGINT

from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
Expand Down Expand Up @@ -143,17 +142,18 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):

def _testInputStream(self, test_inputs, numSlices=None):
"""
This function is only for test.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
This function is only for unittest.
It requires a sequence as input, and returns the i_th element at the i_th batch
under manual clock.
"""
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
test_rdd = self._sc.parallelize(test_input, numSlices)
test_rdds.append(test_rdd._jrdd)
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)

# All deserializer has to be the same.
# TODO: add deserializer validation
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

Expand Down
28 changes: 11 additions & 17 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,23 +276,6 @@ def func(iterator):
yield list(iterator)
return self.mapPartitions(func)

#def transform(self, func): - TD
# from utils import RDDFunction
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
taken = rdd.collect()
result.append(taken)

self.foreachRDD(get_output)

def cache(self):
"""
Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
Expand Down Expand Up @@ -398,6 +381,17 @@ def saveAsPickleFile(rdd, time):

return self.foreachRDD(saveAsPickleFile)

def _test_output(self, result):
"""
This function is only for test case.
Store data in a DStream to result to verify the result in test case
"""
def get_output(rdd, time):
collected = rdd.collect()
result.append(collected)

self.foreachRDD(get_output)


# TODO: implement updateStateByKey
# TODO: implement slice
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/streaming/jtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
from pyspark.streaming.duration import Duration

"""
The name of this file, time is not good naming for python
The name of this file, time is not a good naming for python
because if we do import time when we want to use native python time package, it does
not import python time package.
"""
# TODO: add doctest


class Time(object):
Expand Down
12 changes: 9 additions & 3 deletions python/pyspark/streaming/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@


class RDDFunction():
"""
This class is for py4j callback. This
"""
def __init__(self, ctx, jrdd_deserializer, func):
self.ctx = ctx
self.deserializer = jrdd_deserializer
Expand All @@ -38,6 +41,7 @@ class Java:


def msDurationToString(ms):
#TODO: add doctest
"""
Returns a human-readable string representing a duration such as "35ms"
"""
Expand All @@ -54,8 +58,10 @@ def msDurationToString(ms):
else:
return "%.2f h" % (float(ms) / hour)


def rddToFileName(prefix, suffix, time):
if suffix is not None:
return prefix + "-" + str(time) + "." + suffix
else:
#TODO: add doctest
if suffix is None:
return prefix + "-" + str(time)
else:
return prefix + "-" + str(time) + "." + suffix
15 changes: 8 additions & 7 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
individual modules.
This file would be merged to tests.py after all functions are ready.
But for now, this file is separated due to focusing to streaming test case.
Since python API for streaming is beta, this file is separated.
Callback server seems like unstable sometimes, which cause error in test case.
Callback server is sometimes unstable sometimes, which cause error in test case.
But this is very rare case.
"""
from itertools import chain
Expand Down Expand Up @@ -58,15 +59,14 @@ def tearDownClass(cls):
class TestBasicOperationsSuite(PySparkStreamingTestCase):
"""
2 tests for each function for batach deserializer and unbatch deserilizer because
we cannot change the deserializer after streaming process starts.
the deserializer is not changed dunamically after streaming process starts.
Default numInputPartitions is 2.
If the number of input element is over 3, that DStream use batach deserializer.
If not, that DStream use unbatch deserializer.
Most of the operation uses UTF8 deserializer to get value from Scala.
I am wondering if these test are enough or not.
All tests input should have list of lists. This represents stream.
All tests input should have list of lists. This list represents stream.
Every batch interval, the first object of list are chosen to make DStream.
e.g The first list in the list is input of the first batch.
Please see the BasicTestSuits in Scala which is close to this implementation.
"""
def setUp(self):
Expand Down Expand Up @@ -412,7 +412,7 @@ def _sort_result_based_on_key(self, outputs):

def _run_stream(self, test_input, test_func, expected_output, numSlices=None):
"""
Start stream and return the output.
Start stream and return the result.
@param test_input: dataset for the test. This should be list of lists.
@param test_func: wrapped test_function. This function should return PythonDstream object.
@param expexted_output: expected output for this testcase.
Expand Down Expand Up @@ -444,6 +444,7 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None):

return result


class TestSaveAsFilesSuite(PySparkStreamingTestCase):
def setUp(self):
PySparkStreamingTestCase.setUp(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.streaming.api.python

import java.io._
import java.io.{ObjectInputStream, IOException}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -56,7 +54,9 @@ class PythonDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
parent.getOrCompute(validTime) match{
case Some(rdd) =>
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
// create PythonRDD to compute Python functions.
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes,
preservePartitoning, pythonExec, broadcastVars, accumulator)
Some(pythonRDD.asJavaRDD.rdd)
case None => None
}
Expand All @@ -81,8 +81,8 @@ DStream[Array[Byte]](prev.ssc){
case Some(rdd)=>Some(rdd)
val pairwiseRDD = new PairwiseRDD(rdd)
/*
* Since python operation is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
* Since python function is executed by Scala after StreamingContext.start.
* What PythonPairwiseDStream does is equivalent to python code in pyspark.
*
* with _JavaStackTrace(self.context) as st:
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
Expand All @@ -99,6 +99,7 @@ DStream[Array[Byte]](prev.ssc){
val asJavaDStream = JavaDStream.fromDStream(this)
}


class PythonForeachDStream(
prev: DStream[Array[Byte]],
foreachFunction: PythonRDDFunction
Expand All @@ -112,29 +113,11 @@ class PythonForeachDStream(
this.register()
}

class PythonTransformedDStream(
prev: DStream[Array[Byte]],
transformFunction: PythonRDDFunction
) extends DStream[Array[Byte]](prev.ssc) {

override def dependencies = List(prev)

override def slideDuration: Duration = prev.slideDuration

override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
prev.getOrCompute(validTime).map(rdd => {
transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
})
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is inspired by QueStream
*/

class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.streaming.Time;

/*
* Interface for py4j callback function.
* This function is called by pyspark.streaming.dstream.DStream.foreachRDD .
*/
public interface PythonRDDFunction {
JavaRDD<byte[]> call(JavaRDD<byte[]> rdd, long time);
}

This file was deleted.

0 comments on commit 171edeb

Please sign in to comment.