Skip to content
Permalink
Browse files

[SPARK-23143][SS][PYTHON] Added python API for setting continuous tri…

…gger

## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #20309 from tdas/SPARK-23143.

(cherry picked from commit 2d41f04)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
  • Loading branch information...
tdas committed Jan 18, 2018
1 parent bfdbdd3 commit e6e8bbe84625861f3a4834a2d71cb2f0fe7f6b5a
Showing with 25 additions and 4 deletions.
  1. +19 −4 python/pyspark/sql/streaming.py
  2. +6 −0 python/pyspark/sql/tests.py
@@ -786,7 +786,7 @@ def queryName(self, queryName):


@keyword_only @keyword_only
@since(2.0) @since(2.0)
def trigger(self, processingTime=None, once=None): def trigger(self, processingTime=None, once=None, continuous=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast """Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
@@ -802,23 +802,38 @@ def trigger(self, processingTime=None, once=None):
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds') >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data >>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True) >>> writer = sdf.writeStream.trigger(once=True)
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(continuous='5 seconds')
""" """
params = [processingTime, once, continuous]

if params.count(None) == 3:
raise ValueError('No trigger provided')
elif params.count(None) < 2:
raise ValueError('Multiple triggers not allowed.')

jTrigger = None jTrigger = None
if processingTime is not None: if processingTime is not None:
if once is not None:
raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0: if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('Value for processingTime must be a non empty string. Got: %s' % raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime) processingTime)
interval = processingTime.strip() interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime( jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval) interval)

elif once is not None: elif once is not None:
if once is not True: if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once) raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()

else: else:
raise ValueError('No trigger provided') if type(continuous) != str or len(continuous.strip()) == 0:
raise ValueError('Value for continuous must be a non empty string. Got: %s' %
continuous)
interval = continuous.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
interval)

self._jwrite = self._jwrite.trigger(jTrigger) self._jwrite = self._jwrite.trigger(jTrigger)
return self return self


@@ -1538,6 +1538,12 @@ def test_stream_trigger(self):
except ValueError: except ValueError:
pass pass


# Should not take multiple args
try:
df.writeStream.trigger(processingTime='5 seconds', continuous='1 second')
except ValueError:
pass

# Should take only keyword args # Should take only keyword args
try: try:
df.writeStream.trigger('5 seconds') df.writeStream.trigger('5 seconds')

0 comments on commit e6e8bbe

Please sign in to comment.
You can’t perform that action at this time.