Skip to content

Commit

Permalink
Support running a collector multiple times concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
kormoc committed Dec 21, 2014
1 parent d429c52 commit c251180
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
13 changes: 7 additions & 6 deletions src/diamond/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ def __init__(self, config=None, handlers=[], name=None, configfile=None):
self.name = self.__class__.__name__
else:
self.name = name

self.handlers = handlers
self.last_values = {}

self.configfile = None
self.load_config(configfile, config)

Expand All @@ -190,22 +191,22 @@ def load_config(self, configfile=None, override_config=None):

if configfile is not None:
self.configfile = os.path.abspath(configfile)

if self.configfile is not None:
config = load_config(self.configfile)

if 'collectors' in config:
if 'default' in config['collectors']:
self.config.merge(config['collectors']['default'])

if self.name in config['collectors']:
self.config.merge(config['collectors'][self.name])

if override_config is not None:
if 'collectors' in override_config:
if 'default' in override_config['collectors']:
self.config.merge(override_config['collectors']['default'])

if self.name in override_config['collectors']:
self.config.merge(override_config['collectors'][self.name])

Expand Down
18 changes: 12 additions & 6 deletions src/diamond/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,27 @@ def run(self):
process.terminate()

for process_name in running_collectors - running_processes:
if 'Collector' not in process_name:
# To handle running multiple collectors concurrently, we
# split on white space and use the first word as the
# collector name to spin
collector_name = process_name.split()[0]

if 'Collector' not in collector_name:
continue

# Find the class
for cls in collectors.values():
cls_name = cls.__name__.split('.')[-1]
if cls_name == process_name:
if cls_name == collector_name:
break
if cls_name != process_name:
if cls_name != collector_name:
self.log.error('Can not find collector %s',
process_name)
collector_name)
continue

collector = initialize_collector(
cls,
name=cls.__name__,
name=process_name,
configfile=self.configfile,
handlers=[self.handler_queue])

Expand All @@ -184,7 +190,7 @@ def run(self):
time.sleep(1)

process = multiprocessing.Process(
name=collector.__class__.__name__,
name=process_name,
target=collector_process,
args=(collector, self.metric_queue, self.log)
)
Expand Down

0 comments on commit c251180

Please sign in to comment.