Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
FROM almalinux:8
FROM almalinux:8.9

ARG CLIENT_ID
ARG CLIENT_SECRET
ARG TOKEN_URL

ENV CLIENT_ID=${CLIENT_ID}
ENV CLIENT_SECRET=${CLIENT_SECRET}
ENV TOKEN_URL=${TOKEN_URL}

RUN dnf -y install epel-release && \
yum install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \
yum install -y osg-ca-certs && \
dnf install -y python38 python38-pip && \
dnf install -y https://repo.opensciencegrid.org/osg/23-main/osg-23-main-el8-release-latest.rpm && \
dnf install -y osg-ca-certs && \
dnf install -y python3.11 python3.11-pip wget tar && \
dnf clean all && yum clean all && \
ln -s /usr/bin/python3.8 /usr/bin/python && \
pip3.8 install --no-cache-dir 'elasticsearch>=6.0.0,<7.0.0' 'elasticsearch-dsl>=6.0.0,<7.0.0' htcondor requests prometheus_client
ln -s /usr/bin/python3.11 /usr/bin/python && \
wget https://github.com/WIPACrepo/rest-tools/archive/refs/tags/v1.8.2.tar.gz && \
pip3.11 install --no-cache-dir elasticsearch elasticsearch htcondor requests prometheus_client setuptools ./v1.8.2.tar.gz

COPY . /monitoring

WORKDIR /monitoring

ENV CONDOR_CONFIG=/monitoring/condor_config
ENV CONDOR_CONFIG=/monitoring/condor_config
46 changes: 31 additions & 15 deletions condor_history_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@

import os
import glob
from optparse import OptionParser
from argparse import ArgumentParser
import logging
from functools import partial
from rest_tools.client import ClientCredentialsAuth

parser = OptionParser('usage: %prog [options] history_files')
parser.add_option('-a','--address',help='elasticsearch address')
parser.add_option('-n','--indexname',default='condor',
parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_option('--dailyindex', default=False, action='store_true',
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
parser.add_option("-y", "--dry-run", default=False,
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_option('--collectors', default=False, action='store_true',
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
(options, args) = parser.parse_args()
if not args:
parser.add_argument('--client_id',help='oauth2 client id',default=None)
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
parser.add_argument("positionals", nargs='+')

options = parser.parse_args()
if not options.positionals:
parser.error('no condor history files or collectors')

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')
Expand All @@ -36,7 +42,6 @@ def es_generator(entries):
data['_index'] = options.indexname
if options.dailyindex:
data['_index'] += '-'+(data['date'].split('T')[0].replace('-','.'))
data['_type'] = 'job_ad'
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-')
if not data['_id']:
continue
Expand All @@ -50,10 +55,20 @@ def es_generator(entries):
if '://' in address:
prefix,address = address.split('://')

token = None

if None not in (options.token_url, options.client_secret, options.client_id):
api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq',
token_url=options.token_url,
client_secret=options.client_secret,
client_id=options.client_id)
token = api.make_access_token()

url = '{}://{}'.format(prefix, address)
logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000)
request_timeout=5000,
bearer_auth=token)

def es_import(document_generator):
if options.dry_run:
Expand All @@ -63,21 +78,22 @@ def es_import(document_generator):
json.dump(hit, sys.stdout)
success = True
else:
success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=3600)
success, _ = bulk(es, document_generator, max_retries=20, initial_backoff=2, max_backoff=360)
return success

failed = False
if options.collectors:
for coll_address in args:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address, history=True))
success = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in args:
for filename in glob.iglob(path):
print(options.positionals)
for path in options.positionals:
for filename in glob.glob(path):
gen = es_generator(read_from_file(filename))
success = es_import(gen)
logging.info('finished processing %s', filename)
Expand Down
50 changes: 38 additions & 12 deletions condor_queue_to_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,32 @@

import os
import glob
from optparse import OptionParser
from argparse import ArgumentParser
import logging
from functools import partial
from rest_tools.client import ClientCredentialsAuth

parser = OptionParser('usage: %prog [options] history_files')
parser.add_option('-a','--address',help='elasticsearch address')
parser.add_option('-n','--indexname',default='job_queue',
help='index name (default job_queue)')
parser.add_option('--collectors', default=False, action='store_true',
parser = ArgumentParser('usage: %prog [options] history_files')
parser.add_argument('-a','--address',help='elasticsearch address')
parser.add_argument('-n','--indexname',default='condor',
help='index name (default condor)')
parser.add_argument('--dailyindex', default=False, action='store_true',
help='Index pattern daily')
parser.add_argument("-y", "--dry-run", default=False,
action="store_true",
help="query jobs, but do not ingest into ES",)
parser.add_argument('--collectors', default=False, action='store_true',
help='Args are collector addresses, not files')
(options, args) = parser.parse_args()
if not args:
parser.add_argument('--client_id',help='oauth2 client id',default=None)
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
parser.add_argument("positionals", nargs='+')

options = parser.parse_args()
if not options.positionals:
parser.error('no condor history files or collectors')


logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')

import htcondor
Expand All @@ -39,35 +51,49 @@ def es_generator(entries):
add_classads(data)
data = {k:data[k] for k in keys if k in data} # do filtering
data['_index'] = options.indexname
data['_type'] = 'job_ad'
data['_id'] = data['GlobalJobId'].replace('#','-').replace('.','-') + data['@timestamp']
yield data

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from rest_tools.client import ClientCredentialsAuth

prefix = 'http'
address = options.address



if '://' in address:
prefix,address = address.split('://')

url = '{}://{}'.format(prefix, address)

token = None

if None not in (options.token_url, options.client_secret, options.client_id):
api = ClientCredentialsAuth(address='https://elasticsearch.icecube.aq',
token_url=options.token_url,
client_secret=options.client_secret,
client_id=options.client_id)
token = api.make_access_token()

logging.info('connecting to ES at %s',url)
es = Elasticsearch(hosts=[url],
timeout=5000)
timeout=5000,
bearer_auth=token)
es_import = partial(bulk, es, max_retries=20, initial_backoff=2, max_backoff=3600)

failed = False
if options.collectors:
for coll_address in args:
for coll_address in options.positionals:
try:
gen = es_generator(read_from_collector(coll_address))
success, _ = es_import(gen)
except htcondor.HTCondorIOError as e:
failed = e
logging.error('Condor error', exc_info=True)
else:
for path in args:
for path in options.args:
for filename in glob.iglob(path):
gen = es_generator(read_from_file(filename))
success, _ = es_import(gen)
Expand Down
Loading