diff --git a/Dockerfile b/Dockerfile index 42c0f84..0ae4ee2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ RUN dnf -y install epel-release && \ dnf clean all && yum clean all && \ ln -s /usr/bin/python3.11 /usr/bin/python && \ wget https://github.com/WIPACrepo/rest-tools/archive/refs/tags/v1.8.2.tar.gz && \ - pip3.11 install --no-cache-dir elasticsearch elasticsearch htcondor requests prometheus_client setuptools ./v1.8.2.tar.gz + pip3.11 install --no-cache-dir elasticsearch elasticsearch_dsl htcondor requests prometheus_client setuptools ./v1.8.2.tar.gz COPY . /monitoring diff --git a/condor_history_to_es.py b/condor_history_to_es.py index 0b7d1c4..5c16885 100755 --- a/condor_history_to_es.py +++ b/condor_history_to_es.py @@ -25,6 +25,7 @@ parser.add_argument('--client_id',help='oauth2 client id',default=None) parser.add_argument('--client_secret',help='oauth2 client secret',default=None) parser.add_argument('--token_url',help='oauth2 realm token url',default=None) +parser.add_argument('--token',help='oauth2 token',default=None) parser.add_argument("positionals", nargs='+') options = parser.parse_args() @@ -57,8 +58,10 @@ def es_generator(entries): token = None -if None not in (options.token_url, options.client_secret, options.client_id): - api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq', +if options.token is not None: + token = options.token +elif None not in (options.token_url, options.client_secret, options.client_id): + api = ClientCredentialsAuth(address='https://elastic.icecube.aq', token_url=options.token_url, client_secret=options.client_secret, client_id=options.client_id) @@ -68,7 +71,8 @@ def es_generator(entries): logging.info('connecting to ES at %s',url) es = Elasticsearch(hosts=[url], request_timeout=5000, - bearer_auth=token) + bearer_auth=token, + sniff_on_connection_fail=True) def es_import(document_generator): if options.dry_run: @@ -78,7 +82,7 @@ def es_import(document_generator): json.dump(hit, sys.stdout) success = True else: - success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=360) + success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=10, max_backoff=360) return success failed = False diff --git a/condor_queue_to_es.py b/condor_queue_to_es.py index 52e388e..0a3aaa6 100755 --- a/condor_queue_to_es.py +++ b/condor_queue_to_es.py @@ -81,7 +81,7 @@ def es_generator(entries): es = Elasticsearch(hosts=[url], timeout=5000, bearer_auth=token) -es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600) +es_import = partial(bulk, es, max_retries=20, initial_backoff=10, max_backoff=3600) failed = False if options.collectors: diff --git a/condor_status_to_es.py b/condor_status_to_es.py index 07d0d24..43fdd19 100755 --- a/condor_status_to_es.py +++ b/condor_status_to_es.py @@ -3,8 +3,6 @@ Read from condor status and write to elasticsearch """ - - import glob import json import logging @@ -20,17 +18,38 @@ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk from elasticsearch_dsl import MultiSearch, Search +from elasticsearch.helpers import bulk, BulkIndexError import htcondor +from rest_tools.client import ClientCredentialsAuth from condor_utils import * -regex = re.compile( +REGEX = re.compile( r"((?P\d+?)d)?((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?" ) - +# note different capitalization conventions for GPU and Cpu +RESOURCES = ("GPUs", "Cpus", "Memory", "Disk") +STATUSES = ("evicted", "removed", "finished", "failed") +INDEX = "condor_status" + +class Dry: + """Helper class for debugging""" + _dryrun = False + + def __init__(self, func): + self.func = func + + def __call__(self, *args, **kwargs): + if self._dryrun: + logging.info(self.func.__name__) + logging.info(args) + logging.info(kwargs) + + else: + return self.func(*args,**kwargs) def parse_time(time_str): - parts = regex.match(time_str) + parts = REGEX.match(time_str) if not parts: raise ValueError parts = parts.groupdict() @@ -40,62 +59,20 @@ def parse_time(time_str): time_params[name] = int(param) return timedelta(**time_params) - -parser = ArgumentParser("usage: %prog [options] collector_addresses") -parser.add_argument("-a", "--address", help="elasticsearch address") -parser.add_argument( - "-n", - "--indexname", - default="condor_status", - help="index name (default condor_status)", -) -parser.add_argument( - "--after", default=timedelta(hours=1), help="time to look back", type=parse_time -) -parser.add_argument( - "-y", - "--dry-run", - default=False, - action="store_true", - help="query status, but do not ingest into ES", -) -parser.add_argument( - "-v", - "--verbose", - default=False, - action="store_true", - help="use verbose logging in ES", -) -parser.add_argument("collectors", nargs="+") -options = parser.parse_args() - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s : %(message)s" -) -if options.verbose: - logging.getLogger("elasticsearch").setLevel("DEBUG") - - -# note different capitalization conventions for GPU and Cpu -RESOURCES = ("GPUs", "Cpus", "Memory", "Disk") -STATUSES = ("evicted", "removed", "finished", "failed") - - def update_machines(entries): """ Generate upsert ops from machine classad dictionaries """ for data in entries: yield { - "_index": options.indexname, + "_index": INDEX, "_op_type": "update", - "_type": "machine_ad", "_id": "{:.0f}-{:s}".format( time.mktime(data["DaemonStartTime"].timetuple()), data["Name"] ), "upsert": data, "script": { - "id": options.indexname + "-update-machine", + "id": INDEX + "-update-machine", "params": { "duration": data["duration"], "LastHeardFrom": data["LastHeardFrom"], @@ -124,7 +101,7 @@ def parent_slot_name(dynamic_slot_name): # glidein names are not necessarily unique on long time scales. look up the # last glidein that started with the advertised name _before_ the evicted # job was started - ms = MultiSearch(using=es, index=options.indexname) + ms = MultiSearch(using=es, index=INDEX) for hit in jobs: try: if history: @@ -137,7 +114,7 @@ def parent_slot_name(dynamic_slot_name): ms = ms.add( Search() .filter("term", Name__keyword=parent_slot_name(hit["LastRemoteHost"])) - .filter("range", DaemonStartTime={"lte": datetime.utcfromtimestamp(t0)},) + .filter("range", DaemonStartTime={"lte": datetime.fromtimestamp(t0)},) .sort({"DaemonStartTime": {"order": "desc"}}) .source(["nuthin"])[:1] ) @@ -178,10 +155,9 @@ def parent_slot_name(dynamic_slot_name): doc = { "_op_type": "update", "_index": match.hits[0].meta.index, - "_type": match.hits[0].meta.doc_type, "_id": match.hits[0].meta.id, "script": { - "id": options.indexname + "-update-jobs", + "id": INDEX + "-update-jobs", "params": { "job": hit["GlobalJobId"].replace("#", "-").replace(".", "-"), "category": category, @@ -191,63 +167,20 @@ def parent_slot_name(dynamic_slot_name): } yield doc - -prefix = "http" -address = options.address -if "://" in address: - prefix, address = address.split("://") - -url = "{}://{}".format(prefix, address) -logging.info("connecting to ES at %s", url) -es = Elasticsearch(hosts=[url], timeout=5000) - - -def es_import(document_generator): - if options.dry_run: - for hit in document_generator: - logging.info(hit) - success = True - else: +@Dry +def es_import(gen, es): + try: success, _ = bulk(es, gen, max_retries=20, initial_backoff=2, max_backoff=3600) - return success + return success + except BulkIndexError as e: + for error in e.errors: + logging.info(json.dumps(error, indent=2, default=str)) -machine_ad = edsl.Mapping.from_es( - doc_type="machine_ad", index=options.indexname, using=es -) -if not "claims" in machine_ad or not "failed" in machine_ad.to_dict()['machine_ad']['properties']['claims']['properties']: - machine_ad.field( - "jobs", - edsl.Object(properties={status: edsl.Text(multi=True) for status in STATUSES}), - ) - machine_ad.field( - "claims", - edsl.Object( - properties={ - status: edsl.Object( - properties={resource: edsl.Float() for resource in RESOURCES} - ) - for status in STATUSES - } - ), - ) - machine_ad.field( - "occupancy", - edsl.Object( - properties={ - status: edsl.Object( - properties={resource: edsl.Float() for resource in RESOURCES} - ) - for status in STATUSES + ("total",) - } - ), - ) - machine_ad.field("duration", edsl.Integer()) - machine_ad.save(options.indexname, using=es) - # Claim update, triggered each time a job is removed from a machine +def put_scripts(es, index): es.put_script( - options.indexname + "-update-jobs", - { + id=index + "-update-jobs", + body={ "script": { "source": textwrap.dedent( """ @@ -301,8 +234,8 @@ def key = status+"."+resource; # Occupancy update, triggered every time a glidein increments LastHeardFrom es.put_script( - options.indexname + "-update-machine", - { + id=index + "-update-machine", + body={ "script": { "source": textwrap.dedent( """ @@ -350,65 +283,142 @@ def key = status+"."+resource; } }, ) +if __name__ == "__main__": + parser = ArgumentParser("usage: %prog [options] collector_addresses") + parser.add_argument("-a", "--address", help="elasticsearch address") + parser.add_argument( + "-n", + "--indexname", + default="condor_status", + help="index name (default condor_status)", + ) + parser.add_argument( + "--after", default=timedelta(hours=1), help="time to look back", type=parse_time + ) + parser.add_argument( + "-y", + "--dry-run", + default=False, + action="store_true", + help="query status, but do not ingest into ES", + ) + parser.add_argument( + "-y", + "--put-script", + default=False, + action="store_true", + help="put/update ES update scripts at index", + ) + parser.add_argument( + "-v", + "--verbose", + default=False, + action="store_true", + help="use verbose logging in ES", + ) + parser.add_argument('--client_id',help='oauth2 client id',default=None) + parser.add_argument('--client_secret',help='oauth2 client secret',default=None) + parser.add_argument('--token_url',help='oauth2 realm token url',default=None) + parser.add_argument('--token',help='oauth2 token',default=None) + parser.add_argument("collectors", nargs="+") + options = parser.parse_args() -failed = False -for coll_address in options.collectors: - try: - gen = update_machines( - read_status_from_collector(coll_address, datetime.now() - options.after) - ) - success = es_import(gen) - - # Update claims from evicted and held jobs - after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple()) - gen = update_jobs( - read_from_collector( - coll_address, - constraint=( - "((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)" - + " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))" - ).format(after, after), - projection=[ - "GlobalJobId", - "NumJobStarts", - "JobStatus", - "JobLastStartDate", - "JobCurrentStartDate", - "EnteredCurrentStatus", - "LastVacateTime", - "LastRemoteHost", - ] - + ["Request" + resource for resource in RESOURCES], - ), - history=False, - ) - success = es_import(gen) - - # Update claims from finished jobs - gen = update_jobs( - read_from_collector( - coll_address, - constraint="!isUndefined(LastRemoteHost)", - projection=[ - "GlobalJobId", - "NumJobStarts", - "JobLastStartDate", - "JobCurrentStartDate", - "EnteredCurrentStatus", - "JobStatus", - "ExitCode", - "LastRemoteHost", - ] - + ["Request" + resource for resource in RESOURCES], + INDEX = options.indexname + + Dry._dryrun = options.dry_run + + logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s : %(message)s" + ) + if options.verbose: + logging.getLogger("elasticsearch").setLevel("DEBUG") + + + prefix = "http" + address = options.address + if "://" in address: + prefix, address = address.split("://") + + + token = None + + if options.token is not None: + token = options.token + elif None not in (options.token_url, options.client_secret, options.client_id): + api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq', + token_url=options.token_url, + client_secret=options.client_secret, + client_id=options.client_id) + token = api.make_access_token() + + url = "{}://{}".format(prefix, address) + logging.info("connecting to ES at %s", url) + es = Elasticsearch(hosts=[url], + timeout=5000, + bearer_auth=token, + sniff_on_node_failure=True) + + if options.put_script: + put_scripts(options.index) + + failed = False + for coll_address in options.collectors: + try: + gen = update_machines( + read_status_from_collector(coll_address, datetime.now() - options.after) + ) + success = es_import(gen, es) + + # Update claims from evicted and held jobs + after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple()) + gen = update_jobs( + read_from_collector( + coll_address, + constraint=( + "((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)" + + " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))" + ).format(after, after), + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobStatus", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "LastVacateTime", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + ), + history=False, + ) + success = es_import(gen, es) + + # Update claims from finished jobs + gen = update_jobs( + read_from_collector( + coll_address, + constraint="!isUndefined(LastRemoteHost)", + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "JobStatus", + "ExitCode", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + history=True, + ), history=True, - ), - history=True, - ) - success = es_import(gen) + ) + success = es_import(gen, es) - except htcondor.HTCondorIOError as e: - failed = e - logging.error('Condor error', exc_info=True) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) -if failed: - raise failed + if failed: + raise failed diff --git a/condor_status_to_prometheus.py b/condor_status_to_prometheus.py new file mode 100755 index 0000000..c771d6a --- /dev/null +++ b/condor_status_to_prometheus.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 + +import os +import glob +from optparse import OptionParser +import logging +from functools import partial +import htcondor +from condor_utils import * +import prometheus_client +from condor_metrics import * +from itertools import chain + +def get_job_state(ad): + jobstatus = None + + if ad["LastJobStatus"] == 1: + jobstatus = 'Idle' + elif ad["LastJobStatus"] == 2: + jobstatus = 'Running' + elif ad["LastJobStatus"] == 5: + jobstatus = 'Held' + + return jobstatus + +def generate_ads(entries): + for data in entries: + add_classads(data) + yield data + +def compose_ad_metrics(ads): + for ad in ads: + labels = {key: None for key in metrics.labels} + + labels['schedd'] = ad['GlobalJobId'].split('#')[0] + labels['state'] = get_job_state(ad) + + try: + acct_group = ad['AccountingGroup'] + group = acct_group.split('.')[0] + except Exception: + group = "None" + + if group == 'Undefined': group = 'None' + + labels['group'] = group + labels['owner'] = ad['Owner'] + + metrics.condor_jobs_count.labels(**{'exit_code': ad['ExitCode'],**labels}).inc() + metrics.condor_jobs_cpu_request.labels(**labels).inc(ad['RequestCpus']) + metrics.condor_jobs_cputime.labels(**labels).inc(ad['RemoteUserCpu']) + metrics.condor_jobs_disk_request_bytes.labels(**labels).inc(ad['RequestDisk']*1024) + metrics.condor_jobs_disk_usage_bytes.labels(**labels).inc(ad['DiskUsage_RAW']*1024) + metrics.condor_jobs_memory_request_bytes.labels(**labels).inc(ad['RequestMemory']*1024*1024) + metrics.condor_jobs_memory_usage_bytes.labels(**labels).inc(ad['ResidentSetSize_RAW']*1024) + metrics.condor_jobs_walltime.labels(**labels).inc(ad['walltimehrs']*3600) + metrics.condor_jobs_wastetime.labels(**labels).inc((ad['walltimehrs']*3600)-ad['RemoteUserCpu']) + + if 'RequestGpus' in ad: + metrics.condor_jobs_gpu_request.labels(**labels).inc(ad['RequestGpus']) + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s') + + parser = OptionParser('usage: %prog [options] history_files') + + parser.add_option('-c','--collectors',default=False, action='store_true', + help='read history from') + + parser.add_option('-p','--port', default=9100, + action='store', type='int', + help='port number for prometheus exporter') + parser.add_option('-i','--interval', default=300, + action='store', type='int', + help='collector query interval in seconds') + (options, args) = parser.parse_args() + if not args: + parser.error('no condor history files or collectors') + + metrics = JobMetrics() + + prometheus_client.REGISTRY.unregister(prometheus_client.GC_COLLECTOR) + prometheus_client.REGISTRY.unregister(prometheus_client.PLATFORM_COLLECTOR) + prometheus_client.REGISTRY.unregister(prometheus_client.PROCESS_COLLECTOR) + + prometheus_client.start_http_server(options.port) + + if options.collectors: + while True: + gens = [] + start = time.time() + for coll_address in args: + try: + gens.append(read_from_collector(coll_address)) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) + gen = chain(*gens) + metrics.slot_metrics.clear() + compose_ad_metrics(generate_ads(gen)) + delta = time.time() - start + + if delta < options.interval: + time.sleep(options.interval - delta) \ No newline at end of file diff --git a/new_condor_status_to_es.py b/new_condor_status_to_es.py new file mode 100644 index 0000000..b8bda1c --- /dev/null +++ b/new_condor_status_to_es.py @@ -0,0 +1,263 @@ +#!/usr/bin/env python3 +""" +Read from condor status and write to elasticsearch +""" + +import glob +import json +import logging +import os +import re +import textwrap +from argparse import ArgumentParser +from datetime import datetime, timedelta +from functools import partial +from time import mktime + +import elasticsearch_dsl as edsl +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk, BulkIndexError +from elasticsearch_dsl import MultiSearch, Search +import htcondor +from rest_tools.client import ClientCredentialsAuth + +from condor_utils import * + +regex = re.compile( + r"((?P\d+?)d)?((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?" +) + +# note different capitalization conventions for GPU and Cpu +RESOURCES = ("GPUs", "Cpus", "Memory", "Disk") +STATUSES = ("evicted", "removed", "finished", "failed") + +class Dry: + """Helper class for debugging""" + _dryrun = False + + def __init__(self, func): + self.func = func + + def __call__(self, *args, **kwargs): + if self._dryrun: + logging.info(self.func.__name__) + logging.info(args) + logging.info(kwargs) + + else: + return self.func(*args,**kwargs) + +def parse_time(time_str): + parts = regex.match(time_str) + if not parts: + raise ValueError + parts = parts.groupdict() + time_params = {} + for (name, param) in parts.items(): + if param: + time_params[name] = int(param) + return timedelta(**time_params) + +@Dry +def es_import(gen, es): + try: + success, _ = bulk(es, gen, max_retries=20, initial_backoff=2, max_backoff=3600) + return success + except BulkIndexError as e: + for error in e.errors: + logging.info(json.dumps(error, indent=2, default=str)) + +def update_machines(entries, index): + """ + Generate upsert ops from machine classad dictionaries + """ + for data in entries: + yield data | { + "_index": index, + "_id": f"{data['LastHeardFrom']}-{data['Name']}", + } + +def update_jobs(es, index, entries, history=False): + """ + Generate updates to claims.* from job classad dictionaries + """ + + def parent_slot_name(dynamic_slot_name): + parts = dynamic_slot_name.split("@") + match = re.match(r"(slot\d+)_\d+", parts[0]) + if match: + parts[0] = match.group(1) + return "@".join(parts) + + # MultiSearch will fail if there are no queries to run + jobs = list(entries) + if not jobs: + return + + # glidein names are not necessarily unique on long time scales. look up the + # last glidein that started with the advertised name _before_ the evicted + # job was started + ms = MultiSearch(using=es, index=index) + for hit in jobs: + try: + if history: + t0 = hit["JobCurrentStartDate"] + else: + if hit["JobStatus"] == 5: + t0 = hit["JobCurrentStartDate"] + else: + t0 = hit["JobLastStartDate"] + ms = ms.add( + Search() + .filter("term", Name__keyword=parent_slot_name(hit["LastRemoteHost"])) + .filter("range", DaemonStartTime={"lte": datetime.fromtimestamp(t0)},) + .sort({"DaemonStartTime": {"order": "desc"}}) + .source(["nuthin"])[:1] + ) + except Exception: + logging.warning('failed to process job, %r', hit) + continue + + for hit, match in zip(jobs, ms.execute()): + if not match.hits: + continue + if history: + if hit["JobStatus"] == 3: + category = "removed" + elif hit.get("ExitCode", -1) == 0: + category = "finished" + else: + category = "failed" + walltime = float(hit["EnteredCurrentStatus"] - hit["JobCurrentStartDate"]) + else: + # NB: if a job is evicted from one slot, held on another, and then + # removed from the queue, there's no way to recover the time that + # may have elapsed between hold and removal. To handle this case, + # we treat held jobs as a subcategory of removed jobs, so that they + # will not be counted again when encountered in the history. + if hit["JobStatus"] == 5: + walltime = float(hit["EnteredCurrentStatus"] - hit["JobCurrentStartDate"]) + category = "removed" + else: + walltime = float(hit["LastVacateTime"] - hit["JobLastStartDate"]) + category = "evicted" + + # normalize capitalization of requests + requests = {resource: 0 for resource in RESOURCES} + for k in hit: + if k.startswith("Request"): + requests[k[7:]] = walltime * hit[k] + +def main(options): + prefix = "http" + address = options.address + if "://" in address: + prefix, address = address.split("://") + + token = None + + if options.token is not None: + token = options.token + elif None not in (options.token_url, options.client_secret, options.client_id): + api = ClientCredentialsAuth(address=options.address, + token_url=options.token_url, + client_secret=options.client_secret, + client_id=options.client_id) + token = api.make_access_token() + + url = "{}://{}".format(prefix, address) + logging.info(f"connecting to ES at {url}") + es = Elasticsearch(hosts=[url], + request_timeout=5000, + bearer_auth=token, + sniff_on_node_failure=True) + + for coll_address in options.collectors: + try: + machines = read_status_from_collector(coll_address, datetime.now() - options.after), + + # Update claims from evicted and held jobs + after = time.mktime((datetime.now() - timedelta(minutes=10)).timetuple()) + uncompleted = read_from_collector( + coll_address, + constraint=( + "((LastVacateTime > {}) && ((LastVacateTime-JobLastStartDate))>60)" + + " || ((JobStatus == 5) && (EnteredCurrentStatus > {}))" + ).format(after, after), + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobStatus", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "LastVacateTime", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + ) + + # Update claims from finished jobs + completed = read_from_collector( + coll_address, + constraint="!isUndefined(LastRemoteHost)", + projection=[ + "GlobalJobId", + "NumJobStarts", + "JobLastStartDate", + "JobCurrentStartDate", + "EnteredCurrentStatus", + "JobStatus", + "ExitCode", + "LastRemoteHost", + ] + + ["Request" + resource for resource in RESOURCES], + history=True, + ) + success = es_import(gen) + except htcondor.HTCondorIOError as e: + failed = e + logging.error('Condor error', exc_info=True) + +if __name__ == '__main__': + parser = ArgumentParser("usage: %prog [options] collector_addresses") + parser.add_argument("-a", "--address", help="elasticsearch address") + parser.add_argument( + "-i", + "--index", + default="condor_status", + help="index name (default condor_status)", + ) + parser.add_argument( + "--after", default=timedelta(hours=1), help="time to look back", type=parse_time + ) + parser.add_argument( + "-y", + "--dry-run", + default=False, + action="store_true", + help="query status, but do not ingest into ES", + ) + parser.add_argument( + "-v", + "--verbose", + default=False, + action="store_true", + help="use verbose logging in ES", + ) + parser.add_argument('--client_id',help='oauth2 client id',default=None) + parser.add_argument('--client_secret',help='oauth2 client secret',default=None) + parser.add_argument('--token_url',help='oauth2 realm token url',default=None) + parser.add_argument('--token',help='oauth2 token',default=None) + parser.add_argument("collectors", nargs="+") + options = parser.parse_args() + + Dry._dryrun = options.dry_run + + logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s : %(message)s" + ) + if options.verbose: + logging.getLogger("elasticsearch").setLevel("DEBUG") + + main(options) \ No newline at end of file diff --git a/summarize_glidein_resources.py b/summarize_glidein_resources.py index f348b5c..c3b03b4 100755 --- a/summarize_glidein_resources.py +++ b/summarize_glidein_resources.py @@ -13,111 +13,14 @@ import logging import time from urllib.parse import urlparse, urlunparse - - -def parse_timedelta(time_str): - parts = re.match( - r"((?P(\d+?\.?\d*))d)?((?P(\d+?\.?\d*))h)?((?P(\d+?\.?\d*))m)?((?P(\d+?\.?\d*))s)?", - time_str, - ) - if not parts: - raise ValueError - parts = parts.groupdict() - if not any([v is not None for v in list(parts.values())]): - raise ValueError - time_params = {} - for (name, param) in parts.items(): - if param: - time_params[name] = float(param) - return datetime.timedelta(**time_params) - - -def get_datetime(value): - try: - return datetime.datetime.utcnow() - parse_timedelta(value) - except ValueError: - return dateutil.parser.parse(value) - - -def snap_to_interval(dt, interval): - ts = time.mktime(dt.timetuple()) - ts = ts - (ts % int(interval.total_seconds())) - return datetime.datetime.utcfromtimestamp(ts) - - -def parse_index(url_str): - url = urlparse(url_str) - return { - "host": urlunparse(url._replace(path="", params="", query="", fragment="")), - "index": url.path[1:], - } - - -parser = ArgumentParser( - description=__doc__, formatter_class=ArgumentDefaultsHelpFormatter -) -parser.add_argument( - "--after", default="2d", help="maximum time to look back", type=get_datetime, -) -parser.add_argument( - "--before", default="0d", help="minimum time to look back", type=get_datetime, -) -parser.add_argument( - "--interval", default="20m", help="aggregation interval", type=parse_timedelta, -) -parser.add_argument( - "-y", - "--dry-run", - default=False, - action="store_true", - help="query status, but do not ingest into ES", -) -parser.add_argument( - "-v", - "--verbose", - default=False, - action="store_true", - help="use verbose logging in ES", -) -parser.add_argument( - "-i", - "--input-index", - type=parse_index, - default="http://elk-1.icecube.wisc.edu:9200/condor_status", -) -parser.add_argument( - "-o", - "--output-index", - type=parse_index, - default="http://elk-1.icecube.wisc.edu:9200/glidein_resources", -) - -options = parser.parse_args() - -logging.basicConfig( - level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s : %(message)s" -) -if options.verbose: - logging.getLogger("elasticsearch").setLevel("DEBUG") - -# round time range to nearest interval -after = snap_to_interval(options.after, options.interval) -# ...only if last bin is far enough in the past to be complete -if datetime.datetime.utcnow() - options.before > options.interval: - before = snap_to_interval(options.before, options.interval) -else: - before = options.before - -if not before > after: - parser.error("--before must be > --after") +from rest_tools.client import ClientCredentialsAuth # note different capitalization conventions for GPU and Cpu RESOURCES = ("GPUs", "Cpus", "Memory", "Disk") STATUSES = ("evicted", "removed", "finished", "failed") -# Accumulate offered and claimed resources in time bins, weighting by the -# fraction of each bin that intersects the glidein lifetime -summarize_resources = edsl.A( +def summarize_resources(interval): + return edsl.A( "scripted_metric", init_script=""" state.interval = (Long)(params.interval); @@ -179,12 +82,48 @@ def parse_index(url_str): params={ "left": "DaemonStartTime", "right": "LastHeardFrom", - "interval": int(options.interval.total_seconds() * 1000), + "interval": interval, "RESOURCES": RESOURCES, "STATUSES": STATUSES + ("total",), }, ) +def parse_timedelta(time_str): + parts = re.match( + r"((?P(\d+?\.?\d*))d)?((?P(\d+?\.?\d*))h)?((?P(\d+?\.?\d*))m)?((?P(\d+?\.?\d*))s)?", + time_str, + ) + if not parts: + raise ValueError + parts = parts.groupdict() + if not any([v is not None for v in list(parts.values())]): + raise ValueError + time_params = {} + for (name, param) in parts.items(): + if param: + time_params[name] = float(param) + return datetime.timedelta(**time_params) + + +def get_datetime(value): + try: + return datetime.datetime.now(datetime.timezone.utc) - parse_timedelta(value) + except ValueError: + return dateutil.parser.parse(value) + + +def snap_to_interval(dt, interval): + ts = time.mktime(dt.timetuple()) + ts = ts - (ts % int(interval.total_seconds())) + return datetime.datetime.fromtimestamp(ts,tz=datetime.timezone.utc) + + +def parse_index(url_str): + url = urlparse(url_str) + return { + "host": urlunparse(url._replace(path="", params="", query="", fragment="")), + "index": url.path[1:], + } def scan_aggs(search, source_aggs, inner_aggs={}, size=10): """ @@ -211,7 +150,8 @@ def run_search(**kwargs): response = run_search(after=after) -def resource_summaries(host, index, after, before, interval): +def resource_summaries(client, index, after, before, interval): + parsed_interval = parse_timedelta(interval) by_site = [ {k: edsl.A("terms", field=k + ".keyword")} for k in ("site", "country", "institution", "resource") @@ -229,14 +169,14 @@ def resource_summaries(host, index, after, before, interval): by_timestamp = edsl.A( "date_histogram", field="@timestamp", - interval=int(interval.total_seconds() * 1000), + fixed_interval=interval, ) - by_timestamp.bucket("resources", summarize_resources) + by_timestamp.bucket("resources", summarize_resources(int(parsed_interval.total_seconds() * 1000))) buckets = scan_aggs( ( edsl.Search() - .using(elasticsearch.Elasticsearch(host)) + .using(client) .index(index) .filter("range", **{"@timestamp": {"gte": after, "lt": before}}) ), @@ -251,7 +191,7 @@ def resource_summaries(host, index, after, before, interval): # date_histogram buckets, and the corresponding ticket has been # open for years: # https://github.com/elastic/elasticsearch/issues/23874 - timestamp = datetime.datetime.utcfromtimestamp(bucket.key / 1000) + timestamp = datetime.datetime.fromtimestamp(bucket.key / 1000,tz=datetime.timezone.utc) if timestamp >= after and timestamp < before and bucket.doc_count > 0: data = bucket.resources.value.to_dict() data["count"] = bucket.doc_count @@ -259,42 +199,112 @@ def resource_summaries(host, index, after, before, interval): data["_keys"]["timestamp"] = timestamp.strftime("%Y-%m-%dT%H:%M:%S") yield data - -buckets = resource_summaries( - options.input_index["host"], - options.input_index["index"], - after, - before, - options.interval, -) - - -def make_insert( - generator, - index=options.output_index["index"], - id_keys=["timestamp", "resource", "site", "slot_type"], -): +def make_insert(generator, index, id_keys=["timestamp", "resource", "site", "slot_type"]): for entry in generator: data = dict(entry) data["_index"] = index - data["_type"] = "resource_summary" key = data.pop("_keys") data["_id"] = ".".join([key[k] for k in id_keys]) data.update(key) yield data +if __name__ == '__main__': + parser = ArgumentParser( + description=__doc__, formatter_class=ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + "--after", default="2d", help="maximum time to look back", type=get_datetime, + ) + parser.add_argument( + "--before", default="0d", help="minimum time to look back", type=get_datetime, + ) + parser.add_argument( + "--interval", default="20m", help="aggregation interval", + ) + parser.add_argument( + "-y", + "--dry-run", + default=False, + action="store_true", + help="query status, but do not ingest into ES", + ) + parser.add_argument( + "-v", + "--verbose", + default=False, + action="store_true", + help="use verbose logging in ES", + ) + parser.add_argument( + "-i", + "--input-index", + ) + parser.add_argument( + "-o", + "--output-index", + ) + parser.add_argument("-a", "--address", help="elasticsearch address") + parser.add_argument('--client_id',help='oauth2 client id',default=None) + parser.add_argument('--client_secret',help='oauth2 client secret',default=None) + parser.add_argument('--token_url',help='oauth2 realm token url',default=None) -if options.dry_run: - import json - import sys - - for bucket in make_insert(buckets): - json.dump(bucket, sys.stdout) - sys.stdout.write("\n") -else: - es = elasticsearch.Elasticsearch(hosts=options.output_index["host"], timeout=5000) - index = options.output_index["index"] + options = parser.parse_args() - success, _ = elasticsearch.helpers.bulk( - es, make_insert(buckets), max_retries=20, initial_backoff=2, max_backoff=3600, + logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s : %(message)s" + ) + if options.verbose: + logging.getLogger("elasticsearch").setLevel("DEBUG") + + # round time range to nearest interval + interval_delta = parse_timedelta(options.interval) + after = snap_to_interval(options.after, interval_delta) + # ...only if last bin is far enough in the past to be complete + if datetime.datetime.now(datetime.timezone.utc) - options.before > interval_delta: + before = snap_to_interval(options.before, interval_delta) + else: + before = options.before + + if not before > after: + parser.error("--before must be > --after") + + prefix = "http" + address = options.address + if "://" in address: + prefix, address = address.split("://") + + url = f"{prefix}://{address}" + + if None not in (options.token_url, options.client_secret, options.client_id): + api = ClientCredentialsAuth(address='https://elastic.icecube.aq', + token_url=options.token_url, + client_secret=options.client_secret, + client_id=options.client_id) + else: + exit + token = api.make_access_token() + + es = elasticsearch.Elasticsearch(url, + request_timeout=5000, + bearer_auth=token, + sniff_on_node_failure=True) + + buckets = resource_summaries( + es, + options.input_index, + after, + before, + options.interval, ) + + if options.dry_run: + import json + import sys + + for bucket in make_insert(buckets): + json.dump(bucket, sys.stdout) + sys.stdout.write("\n") + else: + success, _ = elasticsearch.helpers.bulk( + es, make_insert(buckets, options.output_index), max_retries=20, initial_backoff=2, max_backoff=3600, + )