Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion condor_history_to_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def compose_ad_metrics(ad, metrics):
'schedd': None,
'GPUDeviceName': None,
'usage': None,
'kind': None}
'kind': None,
'IceProdDataset': None,
'IceProdTaskName': None,
'MATCH_EXP_JOBGLIDEIN_ResourceName': None}

labels['owner'] = ad['Owner']
labels['site'] = ad['site']
Expand All @@ -80,6 +83,15 @@ def compose_ad_metrics(ad, metrics):
resource_hrs = ad['cpuhrs']
resource_request = ad['RequestCpus']

try:
labels['IceProdDataset'] = ad['IceProdDataset']
labels['IceProdTaskName'] = ad['IceProdTaskName']
except:
pass

if 'MATCH_EXP_JOBGLIDEIN_ResourceName' in ad['MATCH_EXP_JOBGLIDEIN_ResourceName']:
labels['MATCH_EXP_JOBGLIDEIN_ResourceName'] = ad['MATCH_EXP_JOBGLIDEIN_ResourceName']

metrics.condor_job_count.labels(**labels).inc()
metrics.condor_job_walltime_hours.labels(**labels).inc(ad['walltimehrs'])
metrics.condor_job_resource_hours.labels(**labels).inc(resource_hrs)
Expand Down
2 changes: 1 addition & 1 deletion condor_job_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class JobMetrics():

def __init__(self):

labels = ['owner','site','schedd','GPUDeviceName','usage','kind']
labels = ['owner','site','schedd','GPUDeviceName','usage','kind','IceProdDataset','IceProdTaskName','MATCH_EXP_JOBGLIDEIN_ResourceName']

memory_buckets = (1, 2, 3, 4, 6, 8, 12, 20, 40,float('inf'))
resource_buckets = (1, 2, 3, 4, 8, 16, float('inf'))
Expand Down
69 changes: 69 additions & 0 deletions condor_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import re
from prometheus_client import Gauge

class JobMetrics():
'''
condor_job_resource_totals:
labels: schedd, group, user, state
condor_job_counts:
labels: schedd, group, user, state
'''

def __init__(self):
self.labels = ['schedd','group','owner','state']
self.condor_jobs_walltime = Gauge('condor_jobs_walltime',
'Total allocated CPU time from start by jobs',
self.labels)
self.condor_jobs_cputime = Gauge('condor_jobs_cputime',
'Total observed CPU user time from jobs',
self.labels)
self.condor_jobs_wastetime = Gauge('condor_jobs_wastetime',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_cpu_request = Gauge('condor_jobs_cpu_request',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_memory_request_bytes = Gauge('condor_jobs_memory_request_bytes',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_memory_usage_bytes = Gauge('condor_jobs_memory_usage_bytes',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_disk_request_bytes = Gauge('condor_jobs_disk_request_bytes',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_disk_usage_bytes = Gauge('condor_jobs_disk_usage_bytes',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_gpu_request = Gauge('condor_jobs_gpu_request',
'Total of condor job resource attributes',
self.labels)
self.condor_jobs_count = Gauge('condor_jobs_count',
'Count of condor jobs resource attributes',
self.labels + ['exit_code'])
def clear(self):
for key in self.__dict__.keys():
if isinstance(self.__dict__[key],Gauge):
self.__dict__[key].clear()

class SlotMetrics():
def __init__(self):
self.dynamic_slots_totals = Gauge('condor_pool_dynamic_slots_totals',
'Condor Pool Dynamic Slot Resources',
['resource'])
self.dynamic_slots_usage = Gauge('condor_pool_dynamic_slots_usage',
'Condor Pool Dynamic Slot Resources',
['group','user','resource',])
self.partitionable_slots_totals = Gauge('condor_pool_partitionable_slots_totals',
'Condor Pool Partitionable Slot Resources',
['resource'])
self.partitionable_slots_host_totals = Gauge('condor_pool_partitionable_slots_host_totals',
'Condor Pool Partitionable Slot Host Resources',
['host','resource'])
self.partitionable_slots_unusable = Gauge('condor_pool_partitionable_slots_unusable',
'Condor Pool Partitionable Slot Unusable Resources',
['resource'])
def clear(self):
for key in self.__dict__.keys():
if isinstance(self.__dict__[key],Gauge):
self.__dict__[key].clear()
104 changes: 104 additions & 0 deletions condor_queue_to_prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python3


import os
import glob
from optparse import OptionParser
import logging
from functools import partial
import htcondor
from condor_utils import *
import prometheus_client
from condor_metrics import *
from itertools import chain

def get_job_state(ad):
jobstatus = None

if ad["LastJobStatus"] == 1:
jobstatus = 'Idle'
elif ad["LastJobStatus"] == 2:
jobstatus = 'Running'
elif ad["LastJobStatus"] == 5:
jobstatus = 'Held'

return jobstatus

def generate_ads(entries):
for data in entries:
add_classads(data)
yield data

def compose_ad_metrics(ads):
for ad in ads:
labels = {key: None for key in metrics.labels}

labels['schedd'] = ad['GlobalJobId'].split('#')[0]
labels['state'] = get_job_state(ad)

try:
acct_group = ad['AccountingGroup']
group = acct_group.split('.')[0]
except Exception:
group = "None"

if group == 'Undefined': group = 'None'

labels['group'] = group
labels['owner'] = ad['Owner']

metrics.condor_jobs_count.labels(**{'exit_code': ad['ExitCode'],**labels}).inc()
metrics.condor_jobs_cpu_request.labels(**labels).inc(ad['RequestCpus'])
metrics.condor_jobs_cputime.labels(**labels).inc(ad['RemoteUserCpu'])
metrics.condor_jobs_disk_request_bytes.labels(**labels).inc(ad['RequestDisk']*1024)
metrics.condor_jobs_disk_usage_bytes.labels(**labels).inc(ad['DiskUsage_RAW']*1024)
metrics.condor_jobs_memory_request_bytes.labels(**labels).inc(ad['RequestMemory']*1024*1024)
metrics.condor_jobs_memory_usage_bytes.labels(**labels).inc(ad['ResidentSetSize_RAW']*1024)
metrics.condor_jobs_walltime.labels(**labels).inc(ad['walltimehrs']*3600)
metrics.condor_jobs_wastetime.labels(**labels).inc((ad['walltimehrs']*3600)-ad['RemoteUserCpu'])

if 'RequestGpus' in ad:
metrics.condor_jobs_gpu_request.labels(**labels).inc(ad['RequestGpus'])

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

parser = OptionParser('usage: %prog [options] history_files')

parser.add_option('-c','--collectors',default=False, action='store_true',
help='read history from')

parser.add_option('-p','--port', default=9100,
action='store', type='int',
help='port number for prometheus exporter')
parser.add_option('-i','--interval', default=300,
action='store', type='int',
help='collector query interval in seconds')
(options, args) = parser.parse_args()
if not args:
parser.error('no condor history files or collectors')

metrics = JobMetrics()

prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PLATFORM_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PROCESS_COLLECTOR)

prometheus_client.start_http_server(options.port)

if options.collectors:
while True:
gens = []
start = time.time()
for coll_address in args:
try:
gens.append(read_from_collector(coll_address))
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
gen = chain(*gens)
compose_ad_metrics(generate_ads(gen))
delta = time.time() - start

if delta < options.interval:
time.sleep(options.interval - delta)