Skip to content

Commit

Permalink
fix issue #117 and consider race condition for heatbeat data operation
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Mar 8, 2019
1 parent 39d2997 commit 30dfdc6
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions aliyun/log/consumer/heart_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time

from threading import Thread
from multiprocessing import RLock


class HeartBeatLoggerAdapter(logging.LoggerAdapter):
Expand All @@ -26,6 +27,7 @@ def __init__(self, log_client, heartbeat_interval):
self.mheld_shards = []
self.mheart_shards = []
self.shut_down_flag = False
self.lock = RLock()
self.logger = HeartBeatLoggerAdapter(
logging.getLogger(__name__), {"heart_beat": self})

Expand All @@ -48,8 +50,10 @@ def run(self):
self.logger.info(
"shard reorganize, adding: %s, removing: %s",
add_set, remove_set)
self.mheld_shards = response_shards
self.mheart_shards = self.mheld_shards[:]

with self.lock:
self.mheart_shards = list(set(self.mheart_shards + response_shards))
self.mheld_shards = response_shards

# default sleep for 2s from "LogHubConfig"
time_to_sleep = self.heartbeat_interval - (time.time() - last_heatbeat_time)
Expand All @@ -74,7 +78,8 @@ def shutdown(self):

def remove_heart_shard(self, shard):
self.logger.info('try to remove shard "{0}", current shard: {1}'.format(shard, self.mheld_shards))
if shard in self.mheld_shards:
self.mheld_shards.remove(shard)
if shard in self.mheart_shards:
self.mheart_shards.remove(shard)
with self.lock:
if shard in self.mheld_shards:
self.mheld_shards.remove(shard)
if shard in self.mheart_shards:
self.mheart_shards.remove(shard)

0 comments on commit 30dfdc6

Please sign in to comment.