Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions condor_history_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
from rest_tools.client import ClientCredentialsAuth

parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_argument('-a','--address', help='elasticsearch address')
parser.add_argument('-n','--indexname', default='condor',
help='index name (default condor)')
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
help='Index pattern daily')
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
parser.add_argument('--client_id',help='oauth2 client id',default=None)
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
parser.add_argument('--token',help='oauth2 token',default=None)
help='Args are collector addresses, not files')
parser.add_argument('--access_points', default=None,
help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2")
parser.add_argument('--client_id', help='oauth2 client id', default=None)
parser.add_argument('--client_secret', help='oauth2 client secret', default=None)
parser.add_argument('--token_url', help='oauth2 realm token url', default=None)
parser.add_argument('--token', help='oauth2 token',default=None)
parser.add_argument("positionals", nargs='+')

options = parser.parse_args()
Expand Down Expand Up @@ -86,7 +88,15 @@ def es_import(document_generator):
return success

failed = False
if options.collectors:
if options.access_points and options.collectors:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address, options.access_points, history=True))
success = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
elif options.collectors:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address, history=True))
Expand Down
74 changes: 43 additions & 31 deletions condor_history_to_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import prometheus_client
from datetime import datetime
from collections import defaultdict
from socket import gethostbyname

utc_format = '%Y-%m-%dT%H:%M:%S'

Expand All @@ -19,32 +20,39 @@ def generate_ads(entries):

def last_jobs_dict(collector):
last_job = defaultdict(dict)

for collector in args:
schedd_ads = locate_schedds(collector)
if schedd_ads is None:
return None

for s in schedd_ads:
last_job[s.get('Name')] = {'ClusterId': None, 'EnteredCurrentStatus': None}

return last_job


def locate_schedds(collector):
try:
coll = htcondor.Collector(collector)
return coll.locateAll(htcondor.DaemonTypes.Schedd)
except htcondor.HTCondorIOError as e:
failed = e
logging.error(f'Condor error: {e}')

def locate_schedds(collector, access_points):
coll = htcondor.Collector(collector)
schedds = []
if access_points:
try:
for ap in access_points:
schedds.append(coll.locate(htcondor.DaemonTypes.Schedd, ap))
except htcondor.HTCondorIOError as e:
logging.error(f'Condor error: {e}')
else:
try:
schedds.append(coll.locateAll(htcondor.DaemonTypes.Schedd))
except htcondor.HTCondorIOError as e:
logging.error(f'Condor error: {e}')

def compose_ad_metrics(ad, metrics):
''' Parse condor job classad and update metrics

Args:
ad (classad): an HTCondor job classad
metrics (JobMetrics): JobMetrics object
metrics (JobMetrics): JobMetrics object
'''
# ignore this ad if walltimehrs is negative or a dagman
if ad['walltimehrs'] < 0 or ad['Cmd'] == '/usr/bin/condor_dagman':
Expand All @@ -64,7 +72,7 @@ def compose_ad_metrics(ad, metrics):
labels['site'] = ad['site']
labels['schedd'] = ad['GlobalJobId'][0:ad['GlobalJobId'].find('#')]
labels['GPUDeviceName'] = None

if ad['ExitCode'] == 0 and ad['ExitBySignal'] is False and ad['JobStatus'] == 4:
labels['usage'] = 'goodput'
else:
Expand All @@ -83,7 +91,7 @@ def compose_ad_metrics(ad, metrics):
resource_hrs = ad['cpuhrs']
resource_request = ad['RequestCpus']

try:
try:
labels['IceProdDataset'] = ad['IceProdDataset']
labels['IceProdTaskName'] = ad['IceProdTaskName']
except:
Expand All @@ -100,39 +108,27 @@ def compose_ad_metrics(ad, metrics):
metrics.condor_job_mem_req.labels(**labels).observe(ad['RequestMemory']/1024)
metrics.condor_job_mem_used.labels(**labels).observe(ad['ResidentSetSize_RAW']/1048576)

def query_collector(collector, metrics, last_job):
def query_collector(collector, access_points, metrics, last_job):
"""Query schedds for job ads

Args:
collector (str): address for a collector to query
metrics (JobMetrics): JobMetrics instance
last_job (dict): dictionary for tracking last ClusterId by schedd
"""
for schedd_ad in locate_schedds(collector):
for schedd_ad in locate_schedds(collector, access_points):
name = schedd_ad.get('Name')

ads = read_from_schedd(schedd_ad, history=True, since=last_job[name]['ClusterId'])
if last_job[name]['EnteredCurrentStatus'] is not None:
logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}')

for ad in generate_ads(ads):
if last_job[name]['ClusterId'] is None:
last_job[name]['ClusterId'] = int(ad['ClusterId'])
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']

if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format):
last_job[name]['ClusterId'] = int(ad['ClusterId'])
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']

compose_ad_metrics(ad, metrics)
iterate_ads(ads, name, metrics)

def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],match=10000,since=None):
"""Connect to schedd and pull ads directly.

A generator that yields condor job dicts.

Args:
schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll()
schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll()
history (bool): read history (True) or active queue (default: False)
constraint (string): string representation of a classad expression
match (int): number of job ads to return
Expand All @@ -158,6 +154,21 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
except Exception:
logging.info('%s failed', schedd_ad['Name'], exc_info=True)

def iterate_ads(ads, name, metrics, last_job):
if last_job[name]['EnteredCurrentStatus'] is not None:
logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}')

for ad in generate_ads(ads):
if last_job[name]['ClusterId'] is None:
last_job[name]['ClusterId'] = int(ad['ClusterId'])
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']

if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format):
last_job[name]['ClusterId'] = int(ad['ClusterId'])
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']

compose_ad_metrics(ad, metrics)

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

Expand All @@ -168,6 +179,7 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
# TODO: Add file tail function for condor history files
#parser.add_option('-f','--histfile',
# help='history file to read from')
parser.add_option('-a','--access_points',default=None)
parser.add_option('-p','--port', default=9100,
action='store', type='int',
help='port number for prometheus exporter')
Expand Down Expand Up @@ -196,10 +208,10 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
while True:
start = time.time()
for collector in args:
query_collector(collector, metrics, last_job)
query_collector(collector, options.access_points, metrics, last_job)

delta = time.time() - start
# sleep for interval minus scrape duration
# if scrape duration was longer than interval, run right away
if delta < options.interval:
time.sleep(options.interval - delta)
time.sleep(options.interval - delta)
14 changes: 11 additions & 3 deletions condor_queue_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
help="query jobs, but do not ingest into ES",)
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
parser.add_argument('--access_points', default=None,
help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2")
parser.add_argument('--client_id',help='oauth2 client id',default=None)
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
Expand Down Expand Up @@ -61,8 +63,6 @@ def es_generator(entries):
prefix = 'http'
address = options.address



if '://' in address:
prefix,address = address.split('://')

Expand All @@ -84,7 +84,15 @@ def es_generator(entries):
es_import = partial(bulk, es, max_retries=20, initial_backoff=10, max_backoff=3600)

failed = False
if options.collectors:
if options.access_points and options.collectors:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address, options.access_points))
success, _ = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
elif options.collectors:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address))
Expand Down
37 changes: 22 additions & 15 deletions condor_queue_to_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def compose_ad_metrics(ads):

parser.add_option('-c','--collectors',default=False, action='store_true',
help='read history from')

parser.add_option('-a','--access_points',default=None)
parser.add_option('-p','--port', default=9100,
action='store', type='int',
help='port number for prometheus exporter')
Expand All @@ -94,27 +94,34 @@ def compose_ad_metrics(ads):

prometheus_client.start_http_server(options.port)

if options.collectors:
while True:
gens = []
start = time.time()
while True:
gens = []
start = time.time()
if options.access_points and options.collectors:
for coll_address in args:
try:
gens.append(read_from_collector(coll_address, options.access_points))
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
elif options.collectors:
for coll_address in args:
try:
gens.append(read_from_collector(coll_address))
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
gen = chain(*gens)
metrics.clear()
gen = chain(*gens)
metrics.clear()

start_compose_metrics = time.perf_counter()
compose_ad_metrics(generate_ads(gen))
end_compose_metrics = time.perf_counter()
start_compose_metrics = time.perf_counter()
compose_ad_metrics(generate_ads(gen))
end_compose_metrics = time.perf_counter()

compose_diff = end_compose_metrics - start_compose_metrics
logging.info(f'Took {compose_diff} seconds to compose metrics')
compose_diff = end_compose_metrics - start_compose_metrics
logging.info(f'Took {compose_diff} seconds to compose metrics')

delta = time.time() - start
delta = time.time() - start

if delta < options.interval:
time.sleep(options.interval - delta)
if delta < options.interval:
time.sleep(options.interval - delta)
8 changes: 4 additions & 4 deletions condor_status_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
class Dry:
"""Helper class for debugging"""
_dryrun = False

def __init__(self, func):
self.func = func

Expand All @@ -44,7 +44,7 @@ def __call__(self, *args, **kwargs):
logging.info(self.func.__name__)
logging.info(args)
logging.info(kwargs)

else:
return self.func(*args,**kwargs)

Expand Down Expand Up @@ -296,7 +296,7 @@ def key = status+"."+resource;
"--after", default=timedelta(hours=1), help="time to look back", type=parse_time
)
parser.add_argument(
"-y",
"-d",
"--dry-run",
default=False,
action="store_true",
Expand Down Expand Up @@ -353,7 +353,7 @@ def key = status+"."+resource;

url = "{}://{}".format(prefix, address)
logging.info("connecting to ES at %s", url)
es = Elasticsearch(hosts=[url],
es = Elasticsearch(hosts=[url],
timeout=5000,
bearer_auth=token,
sniff_on_node_failure=True)
Expand Down
10 changes: 8 additions & 2 deletions condor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def read_from_file(filename):
else:
entry += line+'\n'

def read_from_collector(address, history=False, constraint='true', projection=[],match=10000):
def read_from_collector(address, access_points=None, history=False, constraint='true', projection=[], match=10000):
"""Connect to condor collectors and schedds to pull job ads directly.

A generator that yields condor job dicts.
Expand All @@ -772,7 +772,12 @@ def read_from_collector(address, history=False, constraint='true', projection=[]
"""
import htcondor
coll = htcondor.Collector(address)
schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd)
schedd_ads = []
if access_points:
for ap in access_points.split(','):
schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd, ap))
else:
schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd)
for schedd_ad in schedd_ads:
logging.info('getting job ads from %s', schedd_ad['Name'])
schedd = htcondor.Schedd(schedd_ad)
Expand All @@ -790,6 +795,7 @@ def read_from_collector(address, history=False, constraint='true', projection=[]
except Exception:
logging.info('%s failed', schedd_ad['Name'], exc_info=True)


def read_status_from_collector(address, after=datetime.now()-timedelta(hours=1)):
"""Connect to condor collectors and schedds to pull job ads directly.

Expand Down
Loading