Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions condor_history_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def es_generator(entries):
if options.dailyindex:
data['_index'] += '-'+(data['date'].split('T')[0].replace('-','.'))
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-')
data['run_interval'] = {'gte': data['JobCurrentStartDate'], 'lte': data['EnteredCurrentStatus']}
if not data['_id']:
continue
yield data
Expand Down
35 changes: 22 additions & 13 deletions condor_history_to_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,35 @@ def generate_ads(entries):
add_classads(data)
yield data

def last_jobs_dict(collector):
def last_jobs_dict(collector, access_points):
last_job = defaultdict(dict)

for collector in args:
schedd_ads = locate_schedds(collector)
schedd_ads = locate_schedds(collector, access_points)
if schedd_ads is None:
return None

for s in schedd_ads:
last_job[s.get('Name')] = {'ClusterId': None, 'EnteredCurrentStatus': None}

return last_job


def locate_schedds(collector, access_points):
def locate_schedds(collector, access_points=None):
coll = htcondor.Collector(collector)
schedds = []
if access_points:
try:
for ap in access_points:
logging.debug(f"getting {ap} schedd ad")
schedds.append(coll.locate(htcondor.DaemonTypes.Schedd, ap))
except htcondor.HTCondorIOError as e:
except Exception as e:
logging.error(f'Condor error: {e}')
else:
try:
schedds.append(coll.locateAll(htcondor.DaemonTypes.Schedd))
except htcondor.HTCondorIOError as e:
except Exception as e:
logging.error(f'Condor error: {e}')
return schedds

def compose_ad_metrics(ad, metrics):
''' Parse condor job classad and update metrics
Expand Down Expand Up @@ -120,7 +121,7 @@ def query_collector(collector, access_points, metrics, last_job):
name = schedd_ad.get('Name')

ads = read_from_schedd(schedd_ad, history=True, since=last_job[name]['ClusterId'])
iterate_ads(ads, name, metrics)
iterate_ads(ads, name, metrics, last_job)

def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],match=10000,since=None):
"""Connect to schedd and pull ads directly.
Expand Down Expand Up @@ -170,8 +171,6 @@ def iterate_ads(ads, name, metrics, last_job):
compose_ad_metrics(ad, metrics)

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

parser = OptionParser('usage: %prog [options] history_files')

parser.add_option('-c','--collectors',default=False, action='store_true',
Expand All @@ -186,29 +185,39 @@ def iterate_ads(ads, name, metrics, last_job):
parser.add_option('-i','--interval', default=300,
action='store', type='int',
help='collector query interval in seconds')
parser.add_option('--debug', default=False, action='store_true')
(options, args) = parser.parse_args()
if not args:
parser.error('no condor history files or collectors')

level = logging.INFO

if options.debug:
level = logging.DEBUG

logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

metrics = JobMetrics()

prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PLATFORM_COLLECTOR)
prometheus_client.REGISTRY.unregister(prometheus_client.PROCESS_COLLECTOR)

prometheus_client.start_http_server(options.port)

if options.collectors:
last_job = last_jobs_dict(args)

if options.access_points:
aps = options.access_points.split(',')
else:
aps = None
last_job = last_jobs_dict(args, aps)
if last_job is None:
logging.error(f'No schedds found')
exit()

while True:
start = time.time()
for collector in args:
query_collector(collector, options.access_points, metrics, last_job)
query_collector(collector, aps, metrics, last_job)

delta = time.time() - start
# sleep for interval minus scrape duration
Expand Down
4 changes: 2 additions & 2 deletions condor_queue_to_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import prometheus_client
from condor_metrics import *
from itertools import chain
from datetime import datetime
from datetime import datetime, timezone
from dateutil import parser as dateparser

def get_job_state(ad):
Expand All @@ -33,7 +33,7 @@ def generate_ads(entries):
def compose_ad_metrics(ads):
for ad in ads:

walltime = int(ad['RequestCpus']) * (datetime.now(datetime.timezone.utc) - dateparser.parse(ad['JobCurrentStartDate'])).total_seconds()
walltime = int(ad['RequestCpus']) * (datetime.now() - dateparser.parse(ad['JobCurrentStartDate'])).total_seconds()
labels = {key: None for key in metrics.labels}

labels['schedd'] = ad['GlobalJobId'].split('#')[0]
Expand Down
2 changes: 2 additions & 0 deletions condor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ def get_institution_from_site(site):
('titan xp', 6.91),
('titan x', 7.49),
('titan rtx', 4.67),
('h100', 1.34),
('h100 1g.10GB', 9.38),
('k20', 33.01), # not recently tested
('k40', 29.67), # not recently tested
('k80', 29.7),
Expand Down
Loading