diff --git a/luigi/configuration.py b/luigi/configuration.py index 95e8a4c7c4..64244616ed 100644 --- a/luigi/configuration.py +++ b/luigi/configuration.py @@ -128,4 +128,4 @@ def get_config(): """ Convenience method (for backwards compatibility) for accessing config singleton. """ - return LuigiConfigParser.instance() + return LuigiConfigParser.instance(os.environ) diff --git a/luigi/contrib/datadog.py b/luigi/contrib/datadog.py new file mode 100644 index 0000000000..69494d8772 --- /dev/null +++ b/luigi/contrib/datadog.py @@ -0,0 +1,117 @@ +from luigi import parameter +from luigi.metrics import MetricsCollector +from luigi.task import Config + +from datadog import initialize, api, statsd + + +class datadog(Config): + api_key = parameter.Parameter(default='dummy_api_key') + app_key = parameter.Parameter(default='dummy_app_key') + default_tags = parameter.Parameter(default='application:luigi') + environment = parameter.Parameter(default='development', description='Environment of the pipeline') + metric_namespace = parameter.Parameter(default='luigi') + statsd_host = parameter.Parameter(default='localhost') + statsd_port = parameter.IntParameter(default=8125) + + +class DataDogMetricsCollector(MetricsCollector): + def __init__(self, *args, **kwargs): + super(DataDogMetricsCollector, self).__init__(*args, **kwargs) + self._config = datadog(**kwargs) + + initialize(api_key=self._config.api_key, + app_key=self._config.app_key, + statsd_host=self._config.statsd_host, + statsd_port=self._config.statsd_port) + + def handle_task_started(self, task): + title = "Luigi: A task has been started!" + text = "A task has been started in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.started', tags=tags) + + event_tags = tags + ["task_state:STARTED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') + + def handle_task_failed(self, task): + title = "Luigi: A task has failed!" + text = "A task has failed in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.failed', tags=tags) + + event_tags = tags + ["task_state:FAILED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') + + def handle_task_disabled(self, task, config): + title = "Luigi: A task has been disabled!" + text = """A task has been disabled in the pipeline named: {name}. + The task has failed {failures} times in the last {window} + seconds, so it is being disabled for {persist} seconds.""".format( + name=task.family, + persist=config.disable_persist, + failures=task.retry_policy.retry_count, + window=config.disable_window + ) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + self.send_increment('task.disabled', tags=tags) + + event_tags = tags + ["task_state:DISABLED"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='error', priority='normal') + + def handle_task_done(self, task): + # The task is already done -- Let's not re-create an event + if task.time_running is None: + return + + title = "Luigi: A task has been completed!" + text = "A task has completed in the pipeline named: {name}".format(name=task.family) + tags = ["task_name:{name}".format(name=task.family)] + self._format_task_params_to_tags(task) + + time_elapse = task.updated - task.time_running + + self.send_increment('task.done', tags=tags) + self.send_gauge('task.execution_time', time_elapse, tags=tags) + + event_tags = tags + ["task_state:DONE"] + self.send_event(title=title, text=text, tags=event_tags, alert_type='info', priority='low') + + def send_event(self, title=None, text=None, tags=[], alert_type='info', priority='normal'): + all_tags = tags + self.default_tags() + + api.Event.create(title=title, text=text, tags=all_tags, alert_type=alert_type, priority=priority) + + def send_gauge(self, metric_name, value, tags=[]): + all_tags = tags + self.default_tags() + + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.gauge(namespaced_metric, value, tags=all_tags) + + def send_increment(self, metric_name, value=1, tags=[]): + all_tags = tags + self.default_tags() + + namespaced_metric = "{namespace}.{metric_name}".format(namespace=self._config.metric_namespace, + metric_name=metric_name) + statsd.increment(namespaced_metric, value, tags=all_tags) + + def _format_task_params_to_tags(self, task): + params = [] + for key, value in task.params.items(): + params.append("{key}:{value}".format(key=key, value=value)) + + return params + + def default_tags(self): + default_tags = [] + + env_tag = "environment:{environment}".format(environment=self._config.environment) + default_tags.append(env_tag) + + if self._config.default_tags: + default_tags = default_tags + str.split(self._config.default_tags, ',') + + return default_tags diff --git a/luigi/metrics.py b/luigi/metrics.py new file mode 100644 index 0000000000..ea5cb00fa3 --- /dev/null +++ b/luigi/metrics.py @@ -0,0 +1,19 @@ +class MetricsCollector(object): + """ + Dummy MetricsCollecter base class that can be replace by tool specific + implementation + """ + def __init__(self, scheduler=None): + self._scheduler = scheduler + + def handle_task_started(self, task): + pass + + def handle_task_failed(self, task): + pass + + def handle_task_disabled(self, task, config): + pass + + def handle_task_done(self, task): + pass diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 405507028b..846161f542 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -148,6 +148,7 @@ class scheduler(Config): prune_on_get_work = parameter.BoolParameter(default=False) pause_enabled = parameter.BoolParameter(default=True) + metrics_collection = parameter.Parameter(default=None) def _get_retry_policy(self): return RetryPolicy(self.retry_count, self.disable_hard_timeout, self.disable_window) @@ -434,6 +435,7 @@ def __init__(self, state_path): self._status_tasks = collections.defaultdict(dict) self._active_workers = {} # map from id to a Worker object self._task_batchers = {} + self._metrics_collector = None def get_state(self): return self._tasks, self._active_workers, self._task_batchers @@ -552,9 +554,11 @@ def set_status(self, task, new_status, config=None): if new_status == FAILED and task.status != DISABLED: task.add_failure() + self.update_metrics_task_failed(task) if task.has_excessive_failures(): task.scheduler_disable_time = time.time() new_status = DISABLED + self.update_metrics_task_disabled(task, config) if not config.batch_emails: notifications.send_error_email( 'Luigi Scheduler: DISABLED {task} due to excessive failures'.format(task=task.id), @@ -574,6 +578,9 @@ def set_status(self, task, new_status, config=None): task.status = new_status task.updated = time.time() + if new_status == DONE: + self.update_metrics_task_done(task) + if new_status == FAILED: task.retry = time.time() + config.retry_delay if remove_on_failure: @@ -656,8 +663,21 @@ def disable_workers(self, worker_ids): worker.disabled = True worker.tasks.clear() + def update_metrics_task_started(self, task): + self._metrics_collector.handle_task_started(task) + + def update_metrics_task_disabled(self, task, config): + self._metrics_collector.handle_task_disabled(task, config) + + def update_metrics_task_failed(self, task): + self._metrics_collector.handle_task_failed(task) + + def update_metrics_task_done(self, task): + self._metrics_collector.handle_task_done(task) + class Scheduler(object): + """ Async scheduler that can handle multiple workers, etc. @@ -689,6 +709,13 @@ def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs if self._config.batch_emails: self._email_batcher = BatchNotifier() + if self._config.metrics_collection == 'datadog': + import luigi.contrib.datadog as datadog + self._state._metrics_collector = datadog.DataDogMetricsCollector(self) + else: + from luigi.metrics import MetricsCollector + self._state._metrics_collector = MetricsCollector(self) + def load(self): self._state.load() @@ -1186,6 +1213,7 @@ def get_work(self, host=None, assistant=False, current_tasks=None, worker=None, reply['batch_task_ids'] = [task.id for task in batched_tasks] elif best_task: + self.update_metrics_task_started(best_task) self._state.set_status(best_task, RUNNING, self._config) best_task.worker_running = worker_id best_task.resources_running = best_task.resources.copy() @@ -1576,3 +1604,19 @@ def _update_task_history(self, task, status, host=None): def task_history(self): # Used by server.py to expose the calls return self._task_history + + @rpc_method() + def update_metrics_task_started(self, task): + self._state.update_metrics_task_started(task) + + @rpc_method() + def update_metrics_task_disabled(self, task): + self._state.update_metrics_task_disabled(task) + + @rpc_method() + def update_metrics_task_failed(self, task): + self._state.update_metrics_task_failed(task) + + @rpc_method() + def update_metrics_task_done(self, task): + self._state.update_metrics_task_done(task)