Skip to content

Commit

Permalink
consumer group: fix time-sleep mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Dec 30, 2018
1 parent e23d0f9 commit b012c1d
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 3 deletions.
3 changes: 1 addition & 2 deletions aliyun/log/consumer/heart_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ def __init__(self, log_client, heartbeat_interval):

def run(self):
logger.info('heart beat start')
last_heatbeat_time = 0
while not self.shut_down_flag:
try:
response_shards = []
last_heatbeat_time = time.time()
self.log_client.heartbeat(self.mheart_shards, response_shards)
logger.info('heart beat result: {} get: {}'.format(self.mheart_shards, response_shards))
self.mheld_shards = response_shards
self.mheart_shards = self.mheld_shards[:]

# default sleep for 2s from "LogHubConfig"
time_to_sleep = self.heartbeat_interval - (time.time() - last_heatbeat_time)
last_heatbeat_time = time.time()
if time_to_sleep > 0 and not self.shut_down_flag:
time.sleep(time_to_sleep)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion aliyun/log/consumer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def run(self):
while not self.shut_down_flag:
held_shards = self.heart_beat.get_held_shards()

last_fetch_time = time.time()
for shard in held_shards:
if self.shut_down_flag:
break
Expand All @@ -52,7 +53,6 @@ def run(self):

# default sleep for 2s from "LogHubConfig"
time_to_sleep = self.option.data_fetch_interval - (time.time() - last_fetch_time)
last_fetch_time = time.time()
if time_to_sleep > 0 and not self.shut_down_flag:
time.sleep(time_to_sleep)

Expand Down

0 comments on commit b012c1d

Please sign in to comment.