Skip to content

Commit

Permalink
added StreamingContext.remember
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Aug 31, 2014
1 parent d68b568 commit da09768
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.context import SparkContext
from pyspark.streaming.dstream import DStream
from pyspark.streaming.duration import Duration

from py4j.java_collections import ListConverter

Expand Down Expand Up @@ -107,6 +108,20 @@ def awaitTermination(self, timeout=None):
else:
self._jssc.awaitTermination(timeout)

def remember(self, duration):
"""
Set each DStreams in this context to remember RDDs it generated in the last given duration.
DStreams remember RDDs only for a limited duration of time and releases them for garbage
collection. This method allows the developer to specify how to long to remember the RDDs (
if the developer wishes to query old data outside the DStream computation).
@param duration pyspark.streaming.duration.Duration object.
Minimum duration that each DStream should remember its RDDs
"""
if not isinstance(duration, Duration):
raise TypeError("Input should be pyspark.streaming.duration.Duration object")

self._jssc.remember(duration._jduration)

# TODO: add storageLevel
def socketTextStream(self, hostname, port):
"""
Expand Down

0 comments on commit da09768

Please sign in to comment.