/
test_consumer.py
46 lines (37 loc) · 1.25 KB
/
test_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from kinesis.consumer import KinesisConsumer
try:
from unittest.mock import MagicMock
except ImportError:
from mock import MagicMock
def test_setup_shards(mocker):
mock_boto3_session = MagicMock()
mock_shard_reader = mocker.patch('kinesis.consumer.ShardReader')
consumer = KinesisConsumer('testing', boto3_session=mock_boto3_session)
mock_boto3_session.client.assert_called_with('kinesis')
consumer.kinesis_client.describe_stream.return_value = {
'StreamDescription': {
'Shards': [
{
'ShardId': 'test-shard',
}
]
}
}
consumer.kinesis_client.get_shard_iterator.return_value = {
'ShardIterator': 'test-iter'
}
consumer.setup_shards()
consumer.kinesis_client.describe_stream.assert_called_with(StreamName='testing')
consumer.kinesis_client.get_shard_iterator.assert_called_with(
StreamName='testing',
ShardId='test-shard',
ShardIteratorType='LATEST'
)
mock_shard_reader.assert_called_with(
'test-shard',
'test-iter',
consumer.record_queue,
consumer.error_queue,
boto3_session=consumer.boto3_session,
sleep_time=consumer.reader_sleep_time
)