-
Notifications
You must be signed in to change notification settings - Fork 117
/
worker.py
executable file
·198 lines (160 loc) · 8.1 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# -*- coding: utf-8 -*-
import logging
import time
from threading import Thread
from .consumer_client import ConsumerClient
from .heart_beat import ConsumerHeatBeat
from .shard_worker import ShardConsumerWorker
from concurrent.futures import ThreadPoolExecutor
class ConsumerWorkerLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
consumer_worker = self.extra['consumer_worker'] # type: ConsumerWorker
consumer_option = consumer_worker.option
_id = '/'.join([
consumer_option.project, consumer_option.logstore,
consumer_option.consumer_group_name, consumer_option.consumer_name
])
return "[{0}]{1}".format(_id, msg), kwargs
class ConsumerWorker(Thread):
def __init__(self, make_processor, consumer_option, args=None,
kwargs=None):
super(ConsumerWorker, self).__init__()
self.make_processor = make_processor
self.process_args = args or ()
self.process_kwargs = kwargs or {}
self.option = consumer_option
self.consumer_client = \
ConsumerClient(consumer_option.endpoint, consumer_option.accessKeyId, consumer_option.accessKey,
consumer_option.project, consumer_option.logstore, consumer_option.consumer_group_name,
consumer_option.consumer_name, consumer_option.securityToken)
self.shut_down_flag = False
self.logger = ConsumerWorkerLoggerAdapter(
logging.getLogger(__name__), {"consumer_worker": self})
self.shard_consumers = {}
self.last_owned_consumer_finish_time = 0
self.consumer_client.create_consumer_group(consumer_option.consumer_group_time_out, consumer_option.in_order)
self.heart_beat = ConsumerHeatBeat(self.consumer_client, consumer_option.heartbeat_interval,
consumer_option.consumer_group_time_out)
if consumer_option.shared_executor is not None:
self.own_executor = False
self._executor = consumer_option.shared_executor
else:
self.own_executor = True
self._executor = ThreadPoolExecutor(max_workers=consumer_option.worker_pool_size)
@property
def executor(self):
return self._executor
def _need_stop(self):
"""
check if need to stop:
1. end_cursor has been hit and there's no more shard assinged (wait for heatbeat_interval * 3)
:return:
"""
if not self.option.cursor_end_time:
return False
all_finish = True
for shard, consumer in self.shard_consumers.items():
if consumer.is_shutdown():
continue
# has not yet do any successful fetch yet or get some data
if consumer.last_success_fetch_time == 0 or consumer.last_fetch_count > 0:
return False
# init self.last_owned_consumer_finish_time if it's None
if all_finish and self.last_owned_consumer_finish_time == 0:
self.last_owned_consumer_finish_time = time.time()
if abs(time.time() - self.last_owned_consumer_finish_time) >= \
self.option.consumer_group_time_out + self.option.heartbeat_interval:
return True
return False
def run(self):
self.logger.info('consumer worker "{0}" start '.format(self.option.consumer_name))
self.heart_beat.start()
while not self.shut_down_flag:
held_shards = self.heart_beat.get_held_shards()
last_fetch_time = time.time()
for shard in held_shards:
if self.shut_down_flag:
break
shard_consumer = self._get_shard_consumer(shard)
if shard_consumer is None: # error when init consumer. shutdown directly
self.shutdown()
break
shard_consumer.consume()
self.clean_shard_consumer(held_shards)
if self._need_stop():
self.logger.info("all owned shards complete the tasks, owned shards: {0}".format(self.shard_consumers))
self.shutdown()
time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
while time_to_sleep > 0 and not self.shut_down_flag:
time.sleep(min(time_to_sleep, 1))
time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
# # stopping worker, need to cleanup all existing shard consumer
self.logger.info('consumer worker "{0}" try to cleanup consumers'.format(self.option.consumer_name))
self.shutdown_and_wait()
if self.own_executor:
self.logger.info('consumer worker "{0}" try to shutdown executors'.format(self.option.consumer_name))
self._executor.shutdown()
self.logger.info('consumer worker "{0}" stopped'.format(self.option.consumer_name))
else:
self.logger.info('executor is shared, consumer worker "{0}" stopped'.format(self.option.consumer_name))
def start(self, join=False):
"""
when calling with join=True, must call it in main thread, or else, the Keyboard Interrupt won't be caputured.
:param join: default False, if hold on until the worker is stopped by Ctrl+C or other reasons.
:return:
"""
Thread.start(self)
if join:
try:
while self.is_alive():
self.join(timeout=60)
self.logger.info("worker {0} exit unexpected, try to shutdown it".format(self.option.consumer_name))
self.shutdown()
except KeyboardInterrupt:
self.logger.info("*** try to exit **** ")
self.shutdown()
def shutdown_and_wait(self):
while True:
time.sleep(0.5)
for shard, consumer in self.shard_consumers.items():
if not consumer.is_shutdown():
consumer.shut_down()
break # there's live consumer, no need to check, loop to next
else:
break # all are shutdown, exit look
self.shard_consumers.clear()
def clean_shard_consumer(self, owned_shards):
remove_shards = []
# remove the shards that's not assigned by server
for shard, consumer in self.shard_consumers.items():
if shard not in owned_shards:
self.logger.info('Try to call shut down for unassigned consumer shard: ' + str(shard))
consumer.shut_down()
self.logger.info('Complete call shut down for unassigned consumer shard: ' + str(shard))
if consumer.is_shutdown():
self.heart_beat.remove_heart_shard(shard)
remove_shards.append(shard)
self.logger.info('Remove an unassigned consumer shard:' + str(shard))
for shard in remove_shards:
self.shard_consumers.pop(shard)
def shutdown(self):
self.shut_down_flag = True
self.heart_beat.shutdown()
self.logger.info('get stop signal, start to stop consumer worker "{0}"'.format(self.option.consumer_name))
def _get_shard_consumer(self, shard_id):
consumer = self.shard_consumers.get(shard_id, None)
if consumer is not None:
return consumer
try:
processer = self.make_processor(*self.process_args, **self.process_kwargs)
except Exception as ex:
self.logger.error("fail to init processor {0} with parameters {1}, {2}, detail: {3}".format(
self.make_processor, self.process_args, self.process_kwargs, ex, exc_info=True))
return None
consumer = ShardConsumerWorker(self.consumer_client, shard_id, self.option.consumer_name,
processer,
self.option.cursor_position, self.option.cursor_start_time,
executor=self._executor,
cursor_end_time=self.option.cursor_end_time)
self.shard_consumers[shard_id] = consumer
return consumer