Skip to content

Commit

Permalink
added TODO coments
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 16, 2014
1 parent 89ae38a commit ea9c873
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
3 changes: 2 additions & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import sys
from signal import signal, SIGTERM, SIGINT
from tempfile import NamedTemporaryFile

from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.context import SparkContext
Expand Down Expand Up @@ -79,6 +78,7 @@ def _clean_up_trigger(self):
"""Kill py4j callback server properly using signal lib"""

def clean_up_handler(*args):
SparkContext._gateway._shutdown_callback_server()
SparkContext._gateway.shutdown()
sys.exit(0)

Expand Down Expand Up @@ -128,6 +128,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
self._jssc.stop(stopSparkContext, stopGraceFully)
finally:
# Stop Callback server
SparkContext._gateway._shutdown_callback_server()
SparkContext._gateway.shutdown()

def _testInputStream(self, test_inputs, numSlices=None):
Expand Down
16 changes: 14 additions & 2 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,27 @@ def saveAsTextFile(rdd, time):
return self.foreachRDD(saveAsTextFile)


# TODO: implement updateStateByKey
# TODO: implement slice

# Window Operations
# TODO: implement window
# TODO: implement groupByKeyAndWindow
# TODO: implement reduceByKeyAndWindow
# TODO: implement countByValueAndWindow
# TODO: implement countByWindow
# TODO: implement reduceByWindow

# Following operation has dependency to transform
# TODO: impelment union
# TODO: implement transform
# TODO: implement transformWith
# TODO: implement union
# TODO: implement repertitions
# TODO: implement cogroup
# TODO: implement join
# TODO: implement leftOuterJoin
# TODO: implemtnt rightOuterJoin


class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down

0 comments on commit ea9c873

Please sign in to comment.