diff --git a/condor_history_to_prometheus.py b/condor_history_to_prometheus.py index 5a09f69..877f1c5 100755 --- a/condor_history_to_prometheus.py +++ b/condor_history_to_prometheus.py @@ -55,7 +55,10 @@ def compose_ad_metrics(ad, metrics): 'schedd': None, 'GPUDeviceName': None, 'usage': None, - 'kind': None} + 'kind': None, + 'IceProdDataset': None, + 'IceProdTaskName': None, + 'MATCH_EXP_JOBGLIDEIN_ResourceName': None} labels['owner'] = ad['Owner'] labels['site'] = ad['site'] @@ -80,6 +83,15 @@ def compose_ad_metrics(ad, metrics): resource_hrs = ad['cpuhrs'] resource_request = ad['RequestCpus'] + try: + labels['IceProdDataset'] = ad['IceProdDataset'] + labels['IceProdTaskName'] = ad['IceProdTaskName'] + except: + pass + + if 'MATCH_EXP_JOBGLIDEIN_ResourceName' in ad['MATCH_EXP_JOBGLIDEIN_ResourceName']: + labels['MATCH_EXP_JOBGLIDEIN_ResourceName'] = ad['MATCH_EXP_JOBGLIDEIN_ResourceName'] + metrics.condor_job_count.labels(**labels).inc() metrics.condor_job_walltime_hours.labels(**labels).inc(ad['walltimehrs']) metrics.condor_job_resource_hours.labels(**labels).inc(resource_hrs) diff --git a/condor_job_metrics.py b/condor_job_metrics.py index bf4e0fd..13d4e30 100644 --- a/condor_job_metrics.py +++ b/condor_job_metrics.py @@ -7,7 +7,7 @@ class JobMetrics(): def __init__(self): - labels = ['owner','site','schedd','GPUDeviceName','usage','kind'] + labels = ['owner','site','schedd','GPUDeviceName','usage','kind','IceProdDataset','IceProdTaskName','MATCH_EXP_JOBGLIDEIN_ResourceName'] memory_buckets = (1, 2, 3, 4, 6, 8, 12, 20, 40,float('inf')) resource_buckets = (1, 2, 3, 4, 8, 16, float('inf')) diff --git a/condor_metrics.py b/condor_metrics.py new file mode 100644 index 0000000..22404bd --- /dev/null +++ b/condor_metrics.py @@ -0,0 +1,69 @@ +import re +from prometheus_client import Gauge + +class JobMetrics(): + ''' + condor_job_resource_totals: + labels: schedd, group, user, state + condor_job_counts: + labels: schedd, group, user, state + ''' + + def __init__(self): + self.labels = ['schedd','group','owner','state'] + self.condor_jobs_walltime = Gauge('condor_jobs_walltime', + 'Total allocated CPU time from start by jobs', + self.labels) + self.condor_jobs_cputime = Gauge('condor_jobs_cputime', + 'Total observed CPU user time from jobs', + self.labels) + self.condor_jobs_wastetime = Gauge('condor_jobs_wastetime', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_cpu_request = Gauge('condor_jobs_cpu_request', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_memory_request_bytes = Gauge('condor_jobs_memory_request_bytes', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_memory_usage_bytes = Gauge('condor_jobs_memory_usage_bytes', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_disk_request_bytes = Gauge('condor_jobs_disk_request_bytes', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_disk_usage_bytes = Gauge('condor_jobs_disk_usage_bytes', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_gpu_request = Gauge('condor_jobs_gpu_request', + 'Total of condor job resource attributes', + self.labels) + self.condor_jobs_count = Gauge('condor_jobs_count', + 'Count of condor jobs resource attributes', + self.labels + ['exit_code']) + def clear(self): + for key in self.__dict__.keys(): + if isinstance(self.__dict__[key],Gauge): + self.__dict__[key].clear() + +class SlotMetrics(): + def __init__(self): + self.dynamic_slots_totals = Gauge('condor_pool_dynamic_slots_totals', + 'Condor Pool Dynamic Slot Resources', + ['resource']) + self.dynamic_slots_usage = Gauge('condor_pool_dynamic_slots_usage', + 'Condor Pool Dynamic Slot Resources', + ['group','user','resource',]) + self.partitionable_slots_totals = Gauge('condor_pool_partitionable_slots_totals', + 'Condor Pool Partitionable Slot Resources', + ['resource']) + self.partitionable_slots_host_totals = Gauge('condor_pool_partitionable_slots_host_totals', + 'Condor Pool Partitionable Slot Host Resources', + ['host','resource']) + self.partitionable_slots_unusable = Gauge('condor_pool_partitionable_slots_unusable', + 'Condor Pool Partitionable Slot Unusable Resources', + ['resource']) + def clear(self): + for key in self.__dict__.keys(): + if isinstance(self.__dict__[key],Gauge): + self.__dict__[key].clear() \ No newline at end of file diff --git a/condor_queue_to_prometheus.py b/condor_queue_to_prometheus.py new file mode 100755 index 0000000..ef89a43 --- /dev/null +++ b/condor_queue_to_prometheus.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 + + +import os +import glob +from optparse import OptionParser +import logging +from functools import partial +import htcondor +from condor_utils import * +import prometheus_client +from condor_metrics import * +from itertools import chain + +def get_job_state(ad): + jobstatus = None + + if ad["LastJobStatus"] == 1: + jobstatus = 'Idle' + elif ad["LastJobStatus"] == 2: + jobstatus = 'Running' + elif ad["LastJobStatus"] == 5: + jobstatus = 'Held' + + return jobstatus + +def generate_ads(entries): + for data in entries: + add_classads(data) + yield data + +def compose_ad_metrics(ads): + for ad in ads: + labels = {key: None for key in metrics.labels} + + labels['schedd'] = ad['GlobalJobId'].split('#')[0] + labels['state'] = get_job_state(ad) + + try: + acct_group = ad['AccountingGroup'] + group = acct_group.split('.')[0] + except Exception: + group = "None" + + if group == 'Undefined': group = 'None' + + labels['group'] = group + labels['owner'] = ad['Owner'] + + metrics.condor_jobs_count.labels(**{'exit_code': ad['ExitCode'],**labels}).inc() + metrics.condor_jobs_cpu_request.labels(**labels).inc(ad['RequestCpus']) + metrics.condor_jobs_cputime.labels(**labels).inc(ad['RemoteUserCpu']) + metrics.condor_jobs_disk_request_bytes.labels(**labels).inc(ad['RequestDisk']*1024) + metrics.condor_jobs_disk_usage_bytes.labels(**labels).inc(ad['DiskUsage_RAW']*1024) + metrics.condor_jobs_memory_request_bytes.labels(**labels).inc(ad['RequestMemory']*1024*1024) + metrics.condor_jobs_memory_usage_bytes.labels(**labels).inc(ad['ResidentSetSize_RAW']*1024) + metrics.condor_jobs_walltime.labels(**labels).inc(ad['walltimehrs']*3600) + metrics.condor_jobs_wastetime.labels(**labels).inc((ad['walltimehrs']*3600)-ad['RemoteUserCpu']) + + if 'RequestGpus' in ad: + metrics.condor_jobs_gpu_request.labels(**labels).inc(ad['RequestGpus']) + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') + + parser = OptionParser('usage: %prog [options] history_files') + + parser.add_option('-c','--collectors',default=False, action='store_true', + help='read history from') + + parser.add_option('-p','--port', default=9100, + action='store', type='int', + help='port number for prometheus exporter') + parser.add_option('-i','--interval', default=300, + action='store', type='int', + help='collector query interval in seconds') + (options, args) = parser.parse_args() + if not args: + parser.error('no condor history files or collectors') + + metrics = JobMetrics() + + prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR) + prometheus_client.REGISTRY.unregister(prometheus_client.PLATFORM_COLLECTOR) + prometheus_client.REGISTRY.unregister(prometheus_client.PROCESS_COLLECTOR) + + prometheus_client.start_http_server(options.port) + + if options.collectors: + while True: + gens = [] + start = time.time() + for coll_address in args: + try: + gens.append(read_from_collector(coll_address)) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) + gen = chain(*gens) + compose_ad_metrics(generate_ads(gen)) + delta = time.time() - start + + if delta < options.interval: + time.sleep(options.interval - delta) \ No newline at end of file