diff --git a/condor_history_to_es.py b/condor_history_to_es.py index 5c16885..133f324 100755 --- a/condor_history_to_es.py +++ b/condor_history_to_es.py @@ -12,20 +12,22 @@ from rest_tools.client import ClientCredentialsAuth parser = ArgumentParser('usage: %prog [options] history_files') -parser.add_argument('-a','--address',help='elasticsearch address') -parser.add_argument('-n','--indexname',default='condor', - help='index name (default condor)') +parser.add_argument('-a','--address', help='elasticsearch address') +parser.add_argument('-n','--indexname', default='condor', + help='index name (default condor)') parser.add_argument('--dailyindex', default=False, action='store_true', - help='Index pattern daily') + help='Index pattern daily') parser.add_argument("-y", "--dry-run", default=False, - action="store_true", - help="query jobs, but do not ingest into ES",) + action="store_true", + help="query jobs, but do not ingest into ES",) parser.add_argument('--collectors', default=False, action='store_true', - help='Args are collector addresses, not files') -parser.add_argument('--client_id',help='oauth2 client id',default=None) -parser.add_argument('--client_secret',help='oauth2 client secret',default=None) -parser.add_argument('--token_url',help='oauth2 realm token url',default=None) -parser.add_argument('--token',help='oauth2 token',default=None) + help='Args are collector addresses, not files') +parser.add_argument('--access_points', default=None, + help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2") +parser.add_argument('--client_id', help='oauth2 client id', default=None) +parser.add_argument('--client_secret', help='oauth2 client secret', default=None) +parser.add_argument('--token_url', help='oauth2 realm token url', default=None) +parser.add_argument('--token', help='oauth2 token',default=None) parser.add_argument("positionals", nargs='+') options = parser.parse_args() @@ -86,7 +88,15 @@ def es_import(document_generator): return success failed = False -if options.collectors: +if options.access_points and options.collectors: + for coll_address in options.positionals: + try: + gen = es_generator(read_from_collector(coll_address, options.access_points, history=True)) + success = es_import(gen) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) +elif options.collectors: for coll_address in options.positionals: try: gen = es_generator(read_from_collector(coll_address, history=True)) diff --git a/condor_history_to_prometheus.py b/condor_history_to_prometheus.py index 0dfd043..17b37ef 100755 --- a/condor_history_to_prometheus.py +++ b/condor_history_to_prometheus.py @@ -9,6 +9,7 @@ import prometheus_client from datetime import datetime from collections import defaultdict +from socket import gethostbyname utc_format = '%Y-%m-%dT%H:%M:%S' @@ -19,32 +20,39 @@ def generate_ads(entries): def last_jobs_dict(collector): last_job = defaultdict(dict) - + for collector in args: schedd_ads = locate_schedds(collector) if schedd_ads is None: return None - + for s in schedd_ads: last_job[s.get('Name')] = {'ClusterId': None, 'EnteredCurrentStatus': None} return last_job - -def locate_schedds(collector): - try: - coll = htcondor.Collector(collector) - return coll.locateAll(htcondor.DaemonTypes.Schedd) - except htcondor.HTCondorIOError as e: - failed = e - logging.error(f'Condor error: {e}') + +def locate_schedds(collector, access_points): + coll = htcondor.Collector(collector) + schedds = [] + if access_points: + try: + for ap in access_points: + schedds.append(coll.locate(htcondor.DaemonTypes.Schedd, ap)) + except htcondor.HTCondorIOError as e: + logging.error(f'Condor error: {e}') + else: + try: + schedds.append(coll.locateAll(htcondor.DaemonTypes.Schedd)) + except htcondor.HTCondorIOError as e: + logging.error(f'Condor error: {e}') def compose_ad_metrics(ad, metrics): ''' Parse condor job classad and update metrics Args: ad (classad): an HTCondor job classad - metrics (JobMetrics): JobMetrics object + metrics (JobMetrics): JobMetrics object ''' # ignore this ad if walltimehrs is negative or a dagman if ad['walltimehrs'] < 0 or ad['Cmd'] == '/usr/bin/condor_dagman': @@ -64,7 +72,7 @@ def compose_ad_metrics(ad, metrics): labels['site'] = ad['site'] labels['schedd'] = ad['GlobalJobId'][0:ad['GlobalJobId'].find('#')] labels['GPUDeviceName'] = None - + if ad['ExitCode'] == 0 and ad['ExitBySignal'] is False and ad['JobStatus'] == 4: labels['usage'] = 'goodput' else: @@ -83,7 +91,7 @@ def compose_ad_metrics(ad, metrics): resource_hrs = ad['cpuhrs'] resource_request = ad['RequestCpus'] - try: + try: labels['IceProdDataset'] = ad['IceProdDataset'] labels['IceProdTaskName'] = ad['IceProdTaskName'] except: @@ -100,7 +108,7 @@ def compose_ad_metrics(ad, metrics): metrics.condor_job_mem_req.labels(**labels).observe(ad['RequestMemory']/1024) metrics.condor_job_mem_used.labels(**labels).observe(ad['ResidentSetSize_RAW']/1048576) -def query_collector(collector, metrics, last_job): +def query_collector(collector, access_points, metrics, last_job): """Query schedds for job ads Args: @@ -108,23 +116,11 @@ def query_collector(collector, metrics, last_job): metrics (JobMetrics): JobMetrics instance last_job (dict): dictionary for tracking last ClusterId by schedd """ - for schedd_ad in locate_schedds(collector): + for schedd_ad in locate_schedds(collector, access_points): name = schedd_ad.get('Name') ads = read_from_schedd(schedd_ad, history=True, since=last_job[name]['ClusterId']) - if last_job[name]['EnteredCurrentStatus'] is not None: - logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}') - - for ad in generate_ads(ads): - if last_job[name]['ClusterId'] is None: - last_job[name]['ClusterId'] = int(ad['ClusterId']) - last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus'] - - if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format): - last_job[name]['ClusterId'] = int(ad['ClusterId']) - last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus'] - - compose_ad_metrics(ad, metrics) + iterate_ads(ads, name, metrics) def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],match=10000,since=None): """Connect to schedd and pull ads directly. @@ -132,7 +128,7 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[], A generator that yields condor job dicts. Args: - schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll() + schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll() history (bool): read history (True) or active queue (default: False) constraint (string): string representation of a classad expression match (int): number of job ads to return @@ -158,6 +154,21 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[], except Exception: logging.info('%s failed', schedd_ad['Name'], exc_info=True) +def iterate_ads(ads, name, metrics, last_job): + if last_job[name]['EnteredCurrentStatus'] is not None: + logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}') + + for ad in generate_ads(ads): + if last_job[name]['ClusterId'] is None: + last_job[name]['ClusterId'] = int(ad['ClusterId']) + last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus'] + + if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format): + last_job[name]['ClusterId'] = int(ad['ClusterId']) + last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus'] + + compose_ad_metrics(ad, metrics) + if __name__ == '__main__': logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') @@ -168,6 +179,7 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[], # TODO: Add file tail function for condor history files #parser.add_option('-f','--histfile', # help='history file to read from') + parser.add_option('-a','--access_points',default=None) parser.add_option('-p','--port', default=9100, action='store', type='int', help='port number for prometheus exporter') @@ -196,10 +208,10 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[], while True: start = time.time() for collector in args: - query_collector(collector, metrics, last_job) + query_collector(collector, options.access_points, metrics, last_job) delta = time.time() - start # sleep for interval minus scrape duration # if scrape duration was longer than interval, run right away if delta < options.interval: - time.sleep(options.interval - delta) + time.sleep(options.interval - delta) \ No newline at end of file diff --git a/condor_queue_to_es.py b/condor_queue_to_es.py index 0a3aaa6..809e36e 100755 --- a/condor_queue_to_es.py +++ b/condor_queue_to_es.py @@ -19,6 +19,8 @@ help="query jobs, but do not ingest into ES",) parser.add_argument('--collectors', default=False, action='store_true', help='Args are collector addresses, not files') +parser.add_argument('--access_points', default=None, + help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2") parser.add_argument('--client_id',help='oauth2 client id',default=None) parser.add_argument('--client_secret',help='oauth2 client secret',default=None) parser.add_argument('--token_url',help='oauth2 realm token url',default=None) @@ -61,8 +63,6 @@ def es_generator(entries): prefix = 'http' address = options.address - - if '://' in address: prefix,address = address.split('://') @@ -84,7 +84,15 @@ def es_generator(entries): es_import = partial(bulk, es, max_retries=20, initial_backoff=10, max_backoff=3600) failed = False -if options.collectors: +if options.access_points and options.collectors: + for coll_address in options.positionals: + try: + gen = es_generator(read_from_collector(coll_address, options.access_points)) + success, _ = es_import(gen) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) +elif options.collectors: for coll_address in options.positionals: try: gen = es_generator(read_from_collector(coll_address)) diff --git a/condor_queue_to_prometheus.py b/condor_queue_to_prometheus.py index 90bada4..a05e9bb 100755 --- a/condor_queue_to_prometheus.py +++ b/condor_queue_to_prometheus.py @@ -75,7 +75,7 @@ def compose_ad_metrics(ads): parser.add_option('-c','--collectors',default=False, action='store_true', help='read history from') - + parser.add_option('-a','--access_points',default=None) parser.add_option('-p','--port', default=9100, action='store', type='int', help='port number for prometheus exporter') @@ -94,27 +94,34 @@ def compose_ad_metrics(ads): prometheus_client.start_http_server(options.port) - if options.collectors: - while True: - gens = [] - start = time.time() + while True: + gens = [] + start = time.time() + if options.access_points and options.collectors: + for coll_address in args: + try: + gens.append(read_from_collector(coll_address, options.access_points)) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) + elif options.collectors: for coll_address in args: try: gens.append(read_from_collector(coll_address)) except htcondor.HTCondorIOError as e: failed = e logging.error('Condor error', exc_info=True) - gen = chain(*gens) - metrics.clear() + gen = chain(*gens) + metrics.clear() - start_compose_metrics = time.perf_counter() - compose_ad_metrics(generate_ads(gen)) - end_compose_metrics = time.perf_counter() + start_compose_metrics = time.perf_counter() + compose_ad_metrics(generate_ads(gen)) + end_compose_metrics = time.perf_counter() - compose_diff = end_compose_metrics - start_compose_metrics - logging.info(f'Took {compose_diff} seconds to compose metrics') + compose_diff = end_compose_metrics - start_compose_metrics + logging.info(f'Took {compose_diff} seconds to compose metrics') - delta = time.time() - start + delta = time.time() - start - if delta < options.interval: - time.sleep(options.interval - delta) \ No newline at end of file + if delta < options.interval: + time.sleep(options.interval - delta) \ No newline at end of file diff --git a/condor_status_to_es.py b/condor_status_to_es.py index 43fdd19..b123a97 100755 --- a/condor_status_to_es.py +++ b/condor_status_to_es.py @@ -35,7 +35,7 @@ class Dry: """Helper class for debugging""" _dryrun = False - + def __init__(self, func): self.func = func @@ -44,7 +44,7 @@ def __call__(self, *args, **kwargs): logging.info(self.func.__name__) logging.info(args) logging.info(kwargs) - + else: return self.func(*args,**kwargs) @@ -296,7 +296,7 @@ def key = status+"."+resource; "--after", default=timedelta(hours=1), help="time to look back", type=parse_time ) parser.add_argument( - "-y", + "-d", "--dry-run", default=False, action="store_true", @@ -353,7 +353,7 @@ def key = status+"."+resource; url = "{}://{}".format(prefix, address) logging.info("connecting to ES at %s", url) - es = Elasticsearch(hosts=[url], + es = Elasticsearch(hosts=[url], timeout=5000, bearer_auth=token, sniff_on_node_failure=True) diff --git a/condor_utils.py b/condor_utils.py index df6debf..f724681 100644 --- a/condor_utils.py +++ b/condor_utils.py @@ -761,7 +761,7 @@ def read_from_file(filename): else: entry += line+'\n' -def read_from_collector(address, history=False, constraint='true', projection=[],match=10000): +def read_from_collector(address, access_points=None, history=False, constraint='true', projection=[], match=10000): """Connect to condor collectors and schedds to pull job ads directly. A generator that yields condor job dicts. @@ -772,7 +772,12 @@ def read_from_collector(address, history=False, constraint='true', projection=[] """ import htcondor coll = htcondor.Collector(address) - schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd) + schedd_ads = [] + if access_points: + for ap in access_points.split(','): + schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd, ap)) + else: + schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd) for schedd_ad in schedd_ads: logging.info('getting job ads from %s', schedd_ad['Name']) schedd = htcondor.Schedd(schedd_ad) @@ -790,6 +795,7 @@ def read_from_collector(address, history=False, constraint='true', projection=[] except Exception: logging.info('%s failed', schedd_ad['Name'], exc_info=True) + def read_status_from_collector(address, after=datetime.now()-timedelta(hours=1)): """Connect to condor collectors and schedds to pull job ads directly.