Skip to content

Commit

Permalink
Merge 3c13e4a into 4d56acf
Browse files Browse the repository at this point in the history
  • Loading branch information
rogelioLpz authored Nov 9, 2019
2 parents 4d56acf + 3c13e4a commit f392f47
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 16 deletions.
20 changes: 5 additions & 15 deletions hub/kinesis/listener.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import json
import logging
import os
import threading
import time
from datetime import datetime, timedelta
from typing import Callable

from botocore.exceptions import (
ClientError,
ConnectionClosedError,
ConnectTimeoutError,
ReadTimeoutError,
)
Expand All @@ -16,17 +16,7 @@
from hub.kinesis.helpers import create_stream, stream_is_active
from hub.kinesis.producer import Producer

KINESIS_TIME_SLEEP = int(os.getenv('KINESIS_TIME_SLEEP', '1'))


def sleep_listener() -> None:
def sleep() -> None:
time.sleep(KINESIS_TIME_SLEEP)

thread = threading.Thread(target=sleep)
thread.start()
# wait listener to wake up
thread.join()
KINESIS_TIME_SLEEP = float(os.getenv('KINESIS_TIME_SLEEP', '.3'))


class Listener:
Expand Down Expand Up @@ -69,8 +59,6 @@ def run(self):
resp = self.process_func(data)
if resp:
Producer.put_data(resp, self.stream_name_response)
else:
sleep_listener()

next_iterator = response['NextShardIterator']
if self.tries is not None:
Expand All @@ -80,8 +68,9 @@ def run(self):
ClientError,
ConnectTimeoutError,
ReadTimeoutError,
ConnectionClosedError,
):
sleep_listener()
pass

except client.exceptions.ExpiredIteratorException:
next_iterator = client.get_shard_iterator(
Expand All @@ -90,3 +79,4 @@ def run(self):
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime.now() - timedelta(seconds=15),
)
time.sleep(KINESIS_TIME_SLEEP)
2 changes: 1 addition & 1 deletion hub/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.0.19'
__version__ = '1.0.20'

0 comments on commit f392f47

Please sign in to comment.