-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathoffset_tracking_receive.py
114 lines (93 loc) · 3.01 KB
/
offset_tracking_receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
from rstream import (
AMQPMessage,
Consumer,
ConsumerOffsetSpecification,
MessageContext,
OffsetNotFound,
OffsetType,
ServerError,
amqp_decoder,
)
message_count = -1
first_offset = -1
last_offset = -1
STREAM_NAME = "stream-offset-tracking-python"
# 2GB
STREAM_RETENTION = 2000000000
async def on_message(msg: AMQPMessage, message_context: MessageContext):
global message_count
global first_offset
global last_offset
offset = message_context.offset
if first_offset == -1:
print("First message received")
first_offset = offset
consumer = message_context.consumer
stream = message_context.consumer.get_stream(message_context.subscriber_name)
# store the offset after every 10 messages received
message_count = message_count + 1
if message_count % 10 == 0:
await consumer.store_offset(
stream=stream,
offset=offset,
subscriber_name=message_context.subscriber_name,
)
if "marker" in str(msg):
await consumer.store_offset(
stream=stream,
offset=offset,
subscriber_name=message_context.subscriber_name,
)
last_offset = offset
await consumer.close()
async def consume():
stored_offset = -1
global first_offset
global last_offset
consumer = Consumer(
host="localhost",
port=5552,
username="guest",
password="guest",
)
await consumer.create_stream(
STREAM_NAME, exists_ok=True, arguments={"max-length-bytes": STREAM_RETENTION}
)
try:
await consumer.start()
print("Started consuming: Press control +C to close")
try:
# will raise an exception if store_offset wasn't invoked before
stored_offset = await consumer.query_offset(
stream=STREAM_NAME, subscriber_name="subscriber_1"
)
except OffsetNotFound as offset_exception:
print(f"Offset not previously stored. {offset_exception}")
except ServerError as server_error:
print(f"Server error: {server_error}")
exit(1)
# if no offset was previously stored start from the first offset
stored_offset = stored_offset + 1
await consumer.subscribe(
stream=STREAM_NAME,
subscriber_name="subscriber_1",
callback=on_message,
decoder=amqp_decoder,
offset_specification=ConsumerOffsetSpecification(
OffsetType.OFFSET, stored_offset
),
)
await consumer.run()
except (KeyboardInterrupt, asyncio.exceptions.CancelledError):
await consumer.close()
# give time to the consumer task to close the consumer
await asyncio.sleep(1)
if first_offset != -1:
print(
"Done consuming first_offset: {} last_offset {} ".format(
first_offset, last_offset
)
)
with asyncio.Runner() as runner:
runner.run(consume())