Skip to content
Permalink
Browse files

[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingList…

…ener

## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21098 from viirya/SPARK-24014.

(cherry picked from commit 8bb0df2)
Signed-off-by: jerryshao <sshao@hortonworks.com>
  • Loading branch information...
viirya authored and jerryshao committed Apr 19, 2018
1 parent 1306411 commit 32bec6ca3d9e47587c84f928d4166475fe29f596
Showing with 15 additions and 1 deletion.
  1. +2 −1 python/pyspark/streaming/kafka.py
  2. +6 −0 python/pyspark/streaming/listener.py
  3. +7 −0 python/pyspark/streaming/tests.py
@@ -104,7 +104,8 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
:param topics: list of topic_name to consume.
:param kafkaParams: Additional params for Kafka.
:param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
point of the stream.
point of the stream (a dictionary mapping `TopicAndPartition` to
integers).
:param keyDecoder: A function used to decode key (default is utf8_decoder).
:param valueDecoder: A function used to decode value (default is utf8_decoder).
:param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess
@@ -23,6 +23,12 @@ class StreamingListener(object):
def __init__(self):
pass

def onStreamingStarted(self, streamingStarted):
"""
Called when the streaming has been started.
"""
pass

def onReceiverStarted(self, receiverStarted):
"""
Called when a receiver has been started
@@ -507,6 +507,10 @@ def __init__(self):
self.batchInfosCompleted = []
self.batchInfosStarted = []
self.batchInfosSubmitted = []
self.streamingStartedTime = []

def onStreamingStarted(self, streamingStarted):
self.streamingStartedTime.append(streamingStarted.time)

def onBatchSubmitted(self, batchSubmitted):
self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ def func(dstream):
batchInfosSubmitted = batch_collector.batchInfosSubmitted
batchInfosStarted = batch_collector.batchInfosStarted
batchInfosCompleted = batch_collector.batchInfosCompleted
streamingStartedTime = batch_collector.streamingStartedTime

self.wait_for(batchInfosCompleted, 4)

self.assertEqual(len(streamingStartedTime), 1)

self.assertGreaterEqual(len(batchInfosSubmitted), 4)
for info in batchInfosSubmitted:
self.assertGreaterEqual(info.batchTime().milliseconds(), 0)

0 comments on commit 32bec6c

Please sign in to comment.
You can’t perform that action at this time.