Skip to content

Commit

Permalink
Merge pull request #110 from crhan/ruohan.chen/heart_beat_logger_improve
Browse files Browse the repository at this point in the history
use LoggerAdapter to keep track of different heart beat instance
  • Loading branch information
wjo1212 committed Jan 7, 2019
2 parents 4b35787 + bcd4632 commit 427f769
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 21 deletions.
13 changes: 11 additions & 2 deletions aliyun/log/consumer/consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@
from ..logexception import LogException
from ..version import USER_AGENT

logger = logging.getLogger(__name__)

class ConsumerClientLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
consumer_client = self.extra['consumer_client'] # type: ConsumerClient
_id = '/'.join([
consumer_client.mproject, consumer_client.mlogstore,
consumer_client.mconsumer_group, consumer_client.mconsumer
])
return "[{0}]{1}".format(_id, msg), kwargs


class ConsumerClient(object):
Expand All @@ -22,7 +30,8 @@ def __init__(self, endpoint, access_key_id, access_key, project,
self.mlogstore = logstore
self.mconsumer_group = consumer_group
self.mconsumer = consumer
self.logger = logging.getLogger(self.__class__.__name__)
self.logger = ConsumerClientLoggerAdapter(
logging.getLogger(__name__), {"consumer_client": self})

def create_consumer_group(self, timeout, in_order):
try:
Expand Down
35 changes: 28 additions & 7 deletions aliyun/log/consumer/heart_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@

from threading import Thread

logger = logging.getLogger(__name__)

class HeartBeatLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
heart_beat = self.extra['heart_beat'] # type: ConsumerHeatBeat
_id = '/'.join([
heart_beat.log_client.mproject, heart_beat.log_client.mlogstore,
heart_beat.log_client.mconsumer_group,
heart_beat.log_client.mconsumer
])
return "[{0}]{1}".format(_id, msg), kwargs


class ConsumerHeatBeat(Thread):
Expand All @@ -17,16 +26,28 @@ def __init__(self, log_client, heartbeat_interval):
self.mheld_shards = []
self.mheart_shards = []
self.shut_down_flag = False
self.logger = HeartBeatLoggerAdapter(
logging.getLogger(__name__), {"heart_beat": self})

def run(self):
logger.info('heart beat start')
self.logger.info('heart beat start')
while not self.shut_down_flag:
try:
response_shards = []
last_heatbeat_time = time.time()

self.log_client.heartbeat(self.mheart_shards, response_shards)
logger.info('heart beat result: {0} get: {1}'.format(self.mheart_shards, response_shards))
self.logger.debug('heart beat result: %s get: %s',
self.mheart_shards, response_shards)
if self.mheart_shards != response_shards:
current_set, response_set = set(
self.mheart_shards), set(response_shards)
add_set = response_set - current_set
remove_set = current_set - response_set
if any([add_set, remove_set]):
self.logger.info(
"shard reorganize, adding: %s, removing: %s",
add_set, remove_set)
self.mheld_shards = response_shards
self.mheart_shards = self.mheld_shards[:]

Expand All @@ -36,9 +57,9 @@ def run(self):
time.sleep(min(time_to_sleep, 1))
time_to_sleep = self.heartbeat_interval - (time.time() - last_heatbeat_time)
except Exception as e:
logger.warning("fail to heat beat", e)
self.logger.warning("fail to heat beat", e)

logger.info('heart beat exit')
self.logger.info('heart beat exit')

def get_held_shards(self):
"""
Expand All @@ -48,11 +69,11 @@ def get_held_shards(self):
return self.mheld_shards[:]

def shutdown(self):
logger.info('try to stop heart beat')
self.logger.info('try to stop heart beat')
self.shut_down_flag = True

def remove_heart_shard(self, shard):
logger.info('try to remove shard "{0}", current shard: {1}'.format(shard, self.mheld_shards))
self.logger.info('try to remove shard "{0}", current shard: {1}'.format(shard, self.mheld_shards))
if shard in self.mheld_shards:
self.mheld_shards.remove(shard)
if shard in self.mheart_shards:
Expand Down
16 changes: 15 additions & 1 deletion aliyun/log/consumer/shard_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@
import six


class ShardConsumerWorkerLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
shard_consumer_worker = self.extra[
'shard_consumer_worker'] # type: ShardConsumerWorker
consumer_client = shard_consumer_worker.log_client
_id = '/'.join([
consumer_client.mproject, consumer_client.mlogstore,
consumer_client.mconsumer_group, consumer_client.mconsumer,
str(shard_consumer_worker.shard_id)
])
return "[{0}]{1}".format(_id, msg), kwargs


class ShardConsumerWorker(object):
def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time,
max_fetch_log_group_size=1000, executor=None):
Expand Down Expand Up @@ -42,7 +55,8 @@ def __init__(self, log_client, shard_id, consumer_name, processor, cursor_positi
self.last_fetch_time = 0
self.last_fetch_count = 0

self.logger = logging.getLogger(__name__)
self.logger = ShardConsumerWorkerLoggerAdapter(
logging.getLogger(__name__), {"shard_consumer_worker": self})

def consume(self):
self.logger.debug('consumer start consuming')
Expand Down
14 changes: 11 additions & 3 deletions aliyun/log/consumer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ def process(self, log_groups, check_point_tracker):
raise NotImplementedError('not create method process')

def shutdown(self, check_point_tracker):
logger.info("ConsumerProcesser is shutdown, shard id: {0}".format(self.shard_id))
consumer_client = check_point_tracker.consumer_group_client
_id = '/'.join([
consumer_client.mproject, consumer_client.mlogstore,
consumer_client.mconsumer_group, consumer_client.mconsumer,
str(self.shard_id)
])
logger.info("[%s]ConsumerProcesser is shutdown, shard id: %s", _id,
self.shard_id)
self.save_checkpoint(check_point_tracker, force=True)


Expand Down Expand Up @@ -165,9 +172,10 @@ def consumer_fetch_task(loghub_client_adapter, shard_id, cursor, max_fetch_log_g
try:
response = loghub_client_adapter.pull_logs(shard_id, cursor, count=max_fetch_log_group_size)
fetch_log_group_list = response.get_loggroup_list()
logger.debug("shard id = " + str(shard_id) + " cursor = " + cursor
+ " next cursor" + response.get_next_cursor() + " size:" + str(response.get_log_count()))
next_cursor = response.get_next_cursor()
logger.debug("shard id = %s cursor = %s next cursor = %s size: %s",
shard_id, cursor, next_cursor,
response.get_log_count())
if not next_cursor:
return FetchTaskResult(fetch_log_group_list, cursor)
else:
Expand Down
25 changes: 17 additions & 8 deletions aliyun/log/consumer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@
from .shard_worker import ShardConsumerWorker
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)

class ConsumerWorkerLoggerAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
consumer_worker = self.extra['consumer_worker'] # type: ConsumerWorker
consumer_option = consumer_worker.option
_id = '/'.join([
consumer_option.project, consumer_option.logstore,
consumer_option.consumer_group_name, consumer_option.consumer_name
])
return "[{0}]{1}".format(_id, msg), kwargs

class ConsumerWorker(Thread):

def __init__(self, make_processor, consumer_option, args=None, kwargs=None):
class ConsumerWorker(Thread):
def __init__(self, make_processor, consumer_option, args=None,
kwargs=None):
super(ConsumerWorker, self).__init__()
self.make_processor = make_processor
self.process_args = args or ()
Expand All @@ -27,7 +36,8 @@ def __init__(self, make_processor, consumer_option, args=None, kwargs=None):
consumer_option.project, consumer_option.logstore, consumer_option.consumer_group_name,
consumer_option.consumer_name, consumer_option.securityToken)
self.shut_down_flag = False
self.logger = logging.getLogger(__name__)
self.logger = ConsumerWorkerLoggerAdapter(
logging.getLogger(__name__), {"consumer_worker": self})
self.shard_consumers = {}

self.consumer_client.create_consumer_group(consumer_option.heartbeat_interval*2, consumer_option.in_order)
Expand Down Expand Up @@ -93,10 +103,10 @@ def start(self, join=False):
try:
while self.is_alive():
self.join(timeout=60)
logger.info("worker {0} exit unexpected, try to shutdown it".format(self.option.consumer_name))
self.logger.info("worker {0} exit unexpected, try to shutdown it".format(self.option.consumer_name))
self.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
self.logger.info("*** try to exit **** ")
self.shutdown()

def shutdown_and_wait(self):
Expand Down Expand Up @@ -140,7 +150,7 @@ def _get_shard_consumer(self, shard_id):
try:
processer = self.make_processor(*self.process_args, **self.process_kwargs)
except Exception as ex:
logger.error("fail to init processor {0} with parameters {1}, {2}, detail: {3}".format(
self.logger.error("fail to init processor {0} with parameters {1}, {2}, detail: {3}".format(
self.make_processor, self.process_args, self.process_kwargs, ex, exc_info=True))
return None

Expand All @@ -150,4 +160,3 @@ def _get_shard_consumer(self, shard_id):
executor=self._executor)
self.shard_consumers[shard_id] = consumer
return consumer

0 comments on commit 427f769

Please sign in to comment.