From 9f78be306142508d06fb8c3261a7883f9d0031d6 Mon Sep 17 00:00:00 2001 From: Arturo Pacheco Date: Thu, 7 Nov 2019 18:17:19 -0600 Subject: [PATCH 1/5] Update listener.py Set sleep to 200 ms Sleep in every iteration --- hub/kinesis/listener.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hub/kinesis/listener.py b/hub/kinesis/listener.py index 317e0d0..d09fa7e 100644 --- a/hub/kinesis/listener.py +++ b/hub/kinesis/listener.py @@ -16,7 +16,7 @@ from hub.kinesis.helpers import create_stream, stream_is_active from hub.kinesis.producer import Producer -KINESIS_TIME_SLEEP = int(os.getenv('KINESIS_TIME_SLEEP', '1')) +KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.2')) def sleep_listener() -> None: @@ -81,7 +81,7 @@ def run(self): ConnectTimeoutError, ReadTimeoutError, ): - sleep_listener() + pass except client.exceptions.ExpiredIteratorException: next_iterator = client.get_shard_iterator( @@ -90,3 +90,4 @@ def run(self): ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now() - timedelta(seconds=15), ) + sleep_listener() From 161f7ee5472fa64608bb27e548f1a7a2790845e8 Mon Sep 17 00:00:00 2001 From: Arturo Pacheco Date: Thu, 7 Nov 2019 19:09:06 -0600 Subject: [PATCH 2/5] Update listener.py --- hub/kinesis/listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/kinesis/listener.py b/hub/kinesis/listener.py index d09fa7e..076e060 100644 --- a/hub/kinesis/listener.py +++ b/hub/kinesis/listener.py @@ -70,7 +70,7 @@ def run(self): if resp: Producer.put_data(resp, self.stream_name_response) else: - sleep_listener() + time.sleep(KINESIS_TIME_SLEEP) next_iterator = response['NextShardIterator'] if self.tries is not None: @@ -90,4 +90,4 @@ def run(self): ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now() - timedelta(seconds=15), ) - sleep_listener() + time.sleep(KINESIS_TIME_SLEEP) From 2617a86774b63cb7be7c4564e5c702c407ab77ec Mon Sep 17 00:00:00 2001 From: Arturo Pacheco Date: Thu, 7 Nov 2019 20:08:35 -0600 Subject: [PATCH 3/5] Update listener.py --- hub/kinesis/listener.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hub/kinesis/listener.py b/hub/kinesis/listener.py index 076e060..8c03789 100644 --- a/hub/kinesis/listener.py +++ b/hub/kinesis/listener.py @@ -16,7 +16,7 @@ from hub.kinesis.helpers import create_stream, stream_is_active from hub.kinesis.producer import Producer -KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.2')) +KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.5')) def sleep_listener() -> None: @@ -70,7 +70,7 @@ def run(self): if resp: Producer.put_data(resp, self.stream_name_response) else: - time.sleep(KINESIS_TIME_SLEEP) + time.sleep(.5) next_iterator = response['NextShardIterator'] if self.tries is not None: @@ -90,4 +90,4 @@ def run(self): ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now() - timedelta(seconds=15), ) - time.sleep(KINESIS_TIME_SLEEP) + time.sleep(.5) From f15293a21d4e1c509964fc7479485c3b7491394f Mon Sep 17 00:00:00 2001 From: Arturo Pacheco Date: Fri, 8 Nov 2019 11:04:09 -0600 Subject: [PATCH 4/5] Update listener.py --- hub/kinesis/listener.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hub/kinesis/listener.py b/hub/kinesis/listener.py index 8c03789..927f22d 100644 --- a/hub/kinesis/listener.py +++ b/hub/kinesis/listener.py @@ -70,7 +70,7 @@ def run(self): if resp: Producer.put_data(resp, self.stream_name_response) else: - time.sleep(.5) + time.sleep(1) next_iterator = response['NextShardIterator'] if self.tries is not None: @@ -90,4 +90,4 @@ def run(self): ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now() - timedelta(seconds=15), ) - time.sleep(.5) + time.sleep(1) From 3c13e4ae668037029e9b1ec24bb8cf67a70724ab Mon Sep 17 00:00:00 2001 From: rogelioLpz Date: Fri, 8 Nov 2019 18:22:57 -0600 Subject: [PATCH 5/5] Add kinesis exception ConnectionClosedError --- hub/kinesis/listener.py | 19 ++++--------------- hub/version.py | 2 +- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/hub/kinesis/listener.py b/hub/kinesis/listener.py index 927f22d..ce7bdbc 100644 --- a/hub/kinesis/listener.py +++ b/hub/kinesis/listener.py @@ -1,13 +1,13 @@ import json import logging import os -import threading import time from datetime import datetime, timedelta from typing import Callable from botocore.exceptions import ( ClientError, + ConnectionClosedError, ConnectTimeoutError, ReadTimeoutError, ) @@ -16,17 +16,7 @@ from hub.kinesis.helpers import create_stream, stream_is_active from hub.kinesis.producer import Producer -KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.5')) - - -def sleep_listener() -> None: - def sleep() -> None: - time.sleep(KINESIS_TIME_SLEEP) - - thread = threading.Thread(target=sleep) - thread.start() - # wait listener to wake up - thread.join() +KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.3')) class Listener: @@ -69,8 +59,6 @@ def run(self): resp = self.process_func(data) if resp: Producer.put_data(resp, self.stream_name_response) - else: - time.sleep(1) next_iterator = response['NextShardIterator'] if self.tries is not None: @@ -80,6 +68,7 @@ def run(self): ClientError, ConnectTimeoutError, ReadTimeoutError, + ConnectionClosedError, ): pass @@ -90,4 +79,4 @@ def run(self): ShardIteratorType='AT_TIMESTAMP', Timestamp=datetime.now() - timedelta(seconds=15), ) - time.sleep(1) + time.sleep(KINESIS_TIME_SLEEP) diff --git a/hub/version.py b/hub/version.py index dce8b34..fa7c0d4 100644 --- a/hub/version.py +++ b/hub/version.py @@ -1 +1 @@ -__version__ = '1.0.19' +__version__ = '1.0.20'