Skip to content

Commit

Permalink
added basic operation test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 9cde7c9 commit b3b0362
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 15 deletions.
19 changes: 10 additions & 9 deletions examples/src/main/python/streaming/test_oprations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,23 @@
conf = SparkConf()
conf.setAppName("PythonStreamingNetworkWordCount")
ssc = StreamingContext(conf=conf, duration=Seconds(1))

test_input = ssc._testInputStream([1,2,3])
class buff:
class Buff:
result = list()
pass
Buff.result = list()

test_input = ssc._testInputStream([range(1,4), range(4,7), range(7,10)])

fm_test = test_input.map(lambda x: (x, 1))
fm_test.test_output(buff)
fm_test.pyprint()
fm_test._test_output(Buff.result)

ssc.start()
while True:
ssc.awaitTermination(50)
try:
buff.result
if len(Buff.result) == 3:
break
except AttributeError:
pass

ssc.stop()
print buff.result
print Buff.result

36 changes: 36 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def _testInputStream(self, test_inputs, numSlices=None):
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
"""
<<<<<<< HEAD
test_rdds = list()
test_rdd_deserializers = list()
for test_input in test_inputs:
Expand All @@ -161,3 +162,38 @@ def _testInputStream(self, test_inputs, numSlices=None):
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

return DStream(jinput_stream, self, test_rdd_deserializers[0])
=======
self._jssc.checkpoint(directory)

def _testInputStream(self, test_inputs, numSlices=None):
"""
Generate multiple files to make "stream" in Scala side for test.
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
"""
numSlices = numSlices or self._sc.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().

tempFiles = list()
for test_input in test_inputs:
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)

# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(test_input):
c = list(test_input) # Make it a list so we can compute its length
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._sc._unbatched_serializer,
batchSize)
else:
serializer = self._sc._unbatched_serializer
serializer.dump_stream(test_input, tempFile)
tempFiles.append(tempFile.name)

jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, PickleSerializer())
>>>>>>> added basic operation test cases
1 change: 0 additions & 1 deletion python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ def pyprint(self):
operator, so this DStream will be registered as an output stream and there materialized.
"""
def takeAndPrint(rdd, time):
print "take and print ==================="
taken = rdd.take(11)
print "-------------------------------------------"
print "Time: %s" % (str(time))
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/streaming_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,11 @@ def tearDownClass(cls):
current_time = time.time()
# check time out
if (current_time - start_time) > self.timeout:
self.ssc.stop()
break
self.ssc.awaitTermination(50)
if buff.result is not None:
if len(expected_output) == len(StreamOutput.result):
break
return buff.result
return StreamOutput.result

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ class PythonDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
parent.getOrCompute(validTime) match{
case Some(rdd) =>
logInfo("RDD ID in python DStream ===========")
logInfo("RDD id " + rdd.id)
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
Some(pythonRDD.asJavaRDD.rdd)
case None => None
Expand Down

0 comments on commit b3b0362

Please sign in to comment.