Skip to content

Commit

Permalink
expose worker count, fetch size to consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Nov 2, 2018
1 parent ddae1ec commit a2fc584
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 7 deletions.
5 changes: 4 additions & 1 deletion aliyun/log/consumer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LogHubConfig(object):
def __init__(self, endpoint, access_key_id, access_key, project, logstore,
consumer_group_name, consumer_name,
cursor_position, heartbeat_interval=20, data_fetch_interval=2, in_order=False,
cursor_start_time=-1, security_token=None):
cursor_start_time=-1, security_token=None, max_fetch_log_group_size=1000, worker_pool_size=2):
self.endpoint = endpoint
self.accessKeyId = access_key_id
self.accessKey = access_key
Expand All @@ -35,3 +35,6 @@ def __init__(self, endpoint, access_key_id, access_key, project, logstore,
self.in_order = in_order
self.cursor_start_time = cursor_start_time
self.securityToken = security_token
self.max_fetch_log_group_size = max_fetch_log_group_size
self.worker_pool_size = worker_pool_size

7 changes: 5 additions & 2 deletions aliyun/log/consumer/shard_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

class ShardConsumerWorker(object):
def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time,
max_workers=2):
max_workers=2, max_fetch_log_group_size=1000):
self.log_client = log_client
self.shard_id = shard_id
self.consumer_name = consumer_name
Expand All @@ -27,6 +27,8 @@ def __init__(self, log_client, shard_id, consumer_name, processor, cursor_positi
self.checkpoint_tracker = ConsumerCheckpointTracker(self.log_client, self.consumer_name,
self.shard_id)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.max_fetch_log_group_size = max_fetch_log_group_size

self.consumer_status = ConsumerStatus.INITIALIZING
self.current_task_exist = False
self.task_future = None
Expand Down Expand Up @@ -93,7 +95,8 @@ def fetch_data(self):
self.last_fetch_time = time.time()
self.fetch_data_future = \
self.executor.submit(consumer_fetch_task,
self.log_client, self.shard_id, self.next_fetch_cursor)
self.log_client, self.shard_id, self.next_fetch_cursor,
max_fetch_log_group_size=self.max_fetch_log_group_size)
else:
self.fetch_data_future = None
else:
Expand Down
3 changes: 1 addition & 2 deletions aliyun/log/consumer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ def consumer_initialize_task(processor, consumer_client, shard_id, cursor_positi
return TaskResult(e)


def consumer_fetch_task(loghub_client_adapter, shard_id, cursor):
max_fetch_log_group_size = 1000
def consumer_fetch_task(loghub_client_adapter, shard_id, cursor, max_fetch_log_group_size=1000):
exception = None

for retry_times in range(3):
Expand Down
3 changes: 2 additions & 1 deletion aliyun/log/consumer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def _get_shard_consumer(self, shard_id):

consumer = ShardConsumerWorker(self.consumer_client, shard_id, self.option.consumer_name,
self.make_processor(),
self.option.cursor_position, self.option.cursor_start_time)
self.option.cursor_position, self.option.cursor_start_time,
max_workers=self.option.worker_pool_size)
self.shard_consumers[shard_id] = consumer
return consumer
2 changes: 1 addition & 1 deletion aliyun/log/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.6.35'
__version__ = '0.6.36'
USER_AGENT = 'log-python-sdk-v-' + __version__
LOGGING_HANDLER_USER_AGENT = 'logging-handler, ' + USER_AGENT
ES_MIGRATION_USER_AGENT = 'es-migration, ' + USER_AGENT
Expand Down

0 comments on commit a2fc584

Please sign in to comment.