Skip to content

Commit

Permalink
Merge pull request #2460 from htcondor/V23_6-HTCONDOR-2076-adstash-ep…
Browse files Browse the repository at this point in the history
…och-ads

HTCONDOR-2076 Add job epoch history fetching to condor_adstash
  • Loading branch information
jasoncpatton committed May 15, 2024
2 parents 60af207 + 45efa5d commit 4ac47f1
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 12 deletions.
1 change: 1 addition & 0 deletions build/packaging/rpm/condor.spec
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ rm -rf %{buildroot}
%_libexecdir/condor/adstash/ad_sources/registry.py
%_libexecdir/condor/adstash/ad_sources/schedd_history.py
%_libexecdir/condor/adstash/ad_sources/startd_history.py
%_libexecdir/condor/adstash/ad_sources/schedd_job_epoch_history.py
%_libexecdir/condor/adstash/interfaces/__init__.py
%_libexecdir/condor/adstash/interfaces/elasticsearch.py
%_libexecdir/condor/adstash/interfaces/opensearch.py
Expand Down
16 changes: 8 additions & 8 deletions docs/admin-manual/cm-configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1905,11 +1905,11 @@ Elasticsearch
:index:`condor_adstash`

HTCondor supports pushing *condor_schedd* and *condor_startd* job
history ClassAds to Elasticsearch (and other targets) via the
and job epoch ClassAds to Elasticsearch (and other targets) via the
:tool:`condor_adstash` tool/daemon.
:tool:`condor_adstash` collects job history ClassAds as specified by its
configuration, either querying specified daemons' histories
or reading job history ClassAds from a specified file,
:tool:`condor_adstash` collects job ClassAds as specified by its
configuration, either querying specified daemons
or reading job ClassAds from a specified file,
converts each ClassAd to a JSON document,
and pushes each doc to the configured Elasticsearch index.
The index is automatically created if it does not exist, and fields
Expand All @@ -1929,10 +1929,10 @@ Running :tool:`condor_adstash` as a daemon (i.e. under the watch of the
:tool:`condor_master`) can be enabled by adding
``use feature : adstash``
to your HTCondor configuration.
By default, this configuration will poll all *condor_schedds* that
report to the ``$(CONDOR_HOST)`` *condor_collector* every 20 minutes
and push the contents of the job history ClassAds to an Elasticsearch
instance running on ``localhost`` to an index named
By default, this configuration will poll the job history on all
*condor_schedds* that report to the ``$(CONDOR_HOST)`` *condor_collector*
every 20 minutes and push the contents of the job history ClassAds to an
Elasticsearch instance running on ``localhost`` to an index named
``htcondor-000001``.
Your situation and monitoring needs are likely different!
See the ``condor_config.local.adstash`` example configuration file in
Expand Down
4 changes: 3 additions & 1 deletion docs/man-pages/condor_adstash.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Synopsis
[**-\-log_file** *PATH*] [**-\-log_level** *LEVEL*]
[**-\-threads** *THREADS*] [**-\-interface** *{null,elasticsearch,jsonfile}*]
[**-\-collectors** *COLLECTORS*] [**-\-schedds** *SCHEDDS*] [**-\-startds** *STARTDS*]
[**-\-schedd_history** ] [**-\-startd_history** ] [**-\-ad_file** *PATH*]
[**-\-schedd_history** ] [**-\-startd_history** ] [**-\-schedd_job_epoch_history** ] [**-\-ad_file** *PATH*]
[**-\-schedd_history_max_ads** *NUM_ADS*] [**-\-startd_history_max_ads** *NUM_ADS*]
[**-\-schedd_history_timeout** *SECONDS*] [**-\-startd_history_timeout** *SECONDS*]
[**-\-se_host** *HOST[:PORT]*] [**-\-se_url_prefix** *PREFIX*]
Expand Down Expand Up @@ -85,6 +85,8 @@ ClassAd source options
Poll and push *condor_schedd* job histories
**-\-startd_history**
Poll and push *condor_startd* job histories
**-\-schedd_job_epoch_history**
Poll and push *condor_schedd* job epoch histories
**-\-ad_file** *PATH*
Load Job ClassAds from a file instead of querying daemons (Ignores
*-\-schedd_history* and *-\-startd_history*.)
Expand Down
4 changes: 4 additions & 0 deletions docs/version-history/feature-versions-23-x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ New Features:
now also provides READ authorization.
:jira:`2424`

- Added option to :tool:`condor_adstash` to poll access points' job
epoch histories.
:jira:`2076`

Bugs Fixed:

- Fixed a bug where transfer of Kerberos credentials from the
Expand Down
2 changes: 2 additions & 0 deletions src/condor_examples/condor_config.local.adstash
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use feature: adstash

# Set to true to poll Schedd history
#ADSTASH_SCHEDD_HISTORY = True
# Set to true to poll Schedd job epoch history
#ADSTASH_SCHEDD_JOB_EPOCH_HISTORY = True
# Comma-separated whitelist of Schedds to read from (* for all, globs not supported)
#ADSTASH_READ_SCHEDDS = *
# Set to true to poll Startd history
Expand Down
4 changes: 4 additions & 0 deletions src/condor_scripts/adstash/ad_sources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def schedd_history_source():
def startd_history_source():
from adstash.ad_sources.startd_history import StartdHistorySource
return StartdHistorySource
def schedd_job_epoch_history_source():
from adstash.ad_sources.schedd_job_epoch_history import ScheddJobEpochHistorySource
return ScheddJobEpochHistorySource
def ad_file_source():
from adstash.ad_sources.ad_file import FileAdSource
return FileAdSource
Expand All @@ -28,6 +31,7 @@ def ad_file_source():
ADSTASH_AD_SOURCE_REGISTRY = {
"schedd_history": schedd_history_source,
"startd_history": startd_history_source,
"schedd_job_epoch_history": schedd_job_epoch_history_source,
"ad_file": ad_file_source,
}
ADSTASH_AD_SOURCES = list(ADSTASH_AD_SOURCE_REGISTRY.keys())
83 changes: 83 additions & 0 deletions src/condor_scripts/adstash/ad_sources/schedd_job_epoch_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2022 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import logging
import htcondor
import traceback

from adstash.ad_sources.generic import GenericAdSource
from adstash.convert import to_json, unique_doc_id


class ScheddJobEpochHistorySource(GenericAdSource):


def fetch_ads(self, schedd_ad, max_ads=10000):
history_kwargs = {}
if max_ads > 0:
history_kwargs["match"] = max_ads

ckpt = self.checkpoint.get(f"Job Epoch {schedd_ad['Name']}")
if ckpt is None:
logging.warning(f"No job epoch checkpoint found for schedd {schedd_ad['Name']}, getting all ads available.")
else:
since_expr = f"""(ClusterId == {ckpt["ClusterId"]}) && (ProcId == {ckpt["ProcId"]}) && (EnteredCurrentStatus == {ckpt["EnteredCurrentStatus"]})"""
history_kwargs["since"] = since_expr
logging.warning(f"Getting job epoch ads from {schedd_ad['Name']} since {since_expr}.")
schedd = htcondor.Schedd(schedd_ad)
return schedd.jobEpochHistory(constraint=True, projection=[], **history_kwargs)


def process_ads(self, interface, ads, schedd_ad, metadata={}, chunk_size=0, **kwargs):
starttime = time.time()
chunk = []
schedd_checkpoint = None
ads_posted = 0
for ad in ads:
try:
dict_ad = to_json(ad, return_dict=True)
except Exception as e:
message = f"Failure when converting document in {schedd_ad['name']} job epoch history: {str(e)}"
exc = traceback.format_exc()
message += f"\n{exc}"
logging.warning(message)
continue

# Unfortunately, the schedd history is in reverse chronological order,
# therefore the checkpoint should be set to the first ad that is returned.
# Here, we assume that the interface is responsible for de-duping ads
# and only update the checkpoint after the full history queue is pushed
# through by returning the new checkpoint at the end.

if schedd_checkpoint is None: # set checkpoint based on first parseable ad
schedd_checkpoint = {"ClusterId": ad["ClusterId"], "ProcId": ad["ProcId"], "EnteredCurrentStatus": ad["EnteredCurrentStatus"]}
chunk.append((unique_doc_id(dict_ad), dict_ad,))

if (chunk_size > 0) and (len(chunk) >= chunk_size):
logging.debug(f"Posting {len(chunk)} job epoch ads from {schedd_ad['Name']}.")
result = interface.post_ads(chunk, metadata=metadata, **kwargs)
ads_posted += result["success"]
yield None # don't update checkpoint yet, per note above
chunk = []

if len(chunk) > 0:
logging.debug(f"Posting {len(chunk)} job epoch ads from {schedd_ad['Name']}.")
result = interface.post_ads(chunk, metadata=metadata, **kwargs)
ads_posted += result["success"]

endtime = time.time()
logging.warning(f"Schedd {schedd_ad['Name']} job epoch history: response count: {ads_posted}; upload time: {(endtime-starttime)/60:.2f} min")
yield schedd_checkpoint # finally update checkpoint
74 changes: 74 additions & 0 deletions src/condor_scripts/adstash/adstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,58 @@ def adstash(args):

logging.warning(f"Processing time for startd history: {(time.time()-startd_starttime)/60:.2f} mins")

if args.read_schedd_job_epoch_history:
schedd_starttime = time.time()

# Get Schedd daemon ads
schedd_ads = []
schedd_ads = get_schedds(args)
logging.warning(f"There are {len(schedd_ads)} schedds to query")

metadata = collect_process_metadata()
metadata["condor_adstash_source"] = "schedd_job_epoch_history"

ad_source = ADSTASH_AD_SOURCE_REGISTRY["schedd_job_epoch_history"]()(checkpoint_file=args.checkpoint_file)

futures = []
manager = multiprocessing.Manager()
checkpoint_queue = manager.Queue()

with multiprocessing.Pool(processes=args.threads, maxtasksperchild=1) as pool:

if len(schedd_ads) > 0:
for schedd_ad in schedd_ads:
name = schedd_ad["Name"]

future = pool.apply_async(
schedd_job_epoch_history_processor,
(ad_source, schedd_ad, checkpoint_queue, interface, metadata, args, src_kwargs),
)
futures.append((name, future))

ckpt_updater = multiprocessing.Process(target=_schedd_ckpt_updater, args=(args, checkpoint_queue, ad_source))
ckpt_updater.start()

# Report processes if they timeout or error
for name, future in futures:
try:
logging.warning(f"Waiting for Schedd {name} to finish.")
future.get(args.schedd_history_timeout)
except multiprocessing.TimeoutError:
logging.warning(f"Waited too long for Schedd {name}; it may still complete in the background.")
except Exception:
logging.exception(f"Error getting progress from Schedd {name}.")

checkpoint_queue.put(None)
logging.warning("Joining the schedd checkpoint queue.")
ckpt_updater.join(timeout=(len(schedd_ads) * args.schedd_history_timeout * len(schedd_ads)))
logging.warning("Shutting down the schedd checkpoint queue.")
ckpt_updater.terminate()
manager.shutdown()
logging.warning("Shutting down the schedd multiprocessing pool.")

logging.warning(f"Processing time for schedd job epoch history: {(time.time()-schedd_starttime)/60:.2f} mins")

processing_time = int(time.time() - starttime)
return processing_time

Expand Down Expand Up @@ -239,3 +291,25 @@ def startd_history_processor(src, startd_ad, ckpt_queue, iface, metadata, args,
ckpt_queue.put({startd_ad["Machine"]: ckpt})
except Exception as e:
logging.error(f"Could not push ads from {startd_ad['Machine']}: {e.__class__.__name__}: {str(e)}")


def schedd_job_epoch_history_processor(src, schedd_ad, ckpt_queue, iface, metadata, args, src_kwargs):
metadata["condor_history_runtime"] = int(time.time())
metadata["condor_history_host_version"] = schedd_ad.get("CondorVersion", "UNKNOWN")
metadata["condor_history_host_platform"] = schedd_ad.get("CondorPlatform", "UNKNOWN")
metadata["condor_history_host_machine"] = schedd_ad.get("Machine", "UNKNOWN")
metadata["condor_history_host_name"] = schedd_ad.get("Name", "UNKNOWN")
try:
ads = src.fetch_ads(schedd_ad, max_ads=args.schedd_history_max_ads)
except Exception as e:
logging.error(f"Could not fetch job epoch ads from {schedd_ad['Name']}: {e.__class__.__name__}: {str(e)}")
return
else:
if ads is None:
return
try:
for ckpt in src.process_ads(iface, ads, schedd_ad, metadata=metadata, **src_kwargs):
if ckpt is not None:
ckpt_queue.put({f"Job Epoch {schedd_ad['Name']}": ckpt})
except Exception as e:
logging.error(f"Could not push job epoch ads from {schedd_ad['Name']}: {e.__class__.__name__}: {str(e)}")
13 changes: 13 additions & 0 deletions src/condor_scripts/adstash/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def get_default_config(name="ADSTASH"):
"collectors": htcondor.param.get("CONDOR_HOST"),
"read_schedd_history": False,
"read_startd_history": False,
"read_schedd_job_epoch_history": False,
"read_ad_file": None,
"schedd_history_max_ads": 10000,
"startd_history_max_ads": 10000,
Expand Down Expand Up @@ -70,6 +71,7 @@ def get_htcondor_config(name="ADSTASH"):
"startds": p.get(f"{name}_READ_STARTDS"),
"read_schedd_history": p.get(f"{name}_SCHEDD_HISTORY"),
"read_startd_history": p.get(f"{name}_STARTD_HISTORY"),
"read_schedd_job_epoch_history": p.get(f"{name}_SCHEDD_JOB_EPOCH_HISTORY"),
"read_ad_file": p.get(f"{name}_AD_FILE"),
"schedd_history_max_ads": p.get(f"{name}_SCHEDD_HISTORY_MAX_ADS"),
"startd_history_max_ads": p.get(f"{name}_STARTD_HISTORY_MAX_ADS"),
Expand Down Expand Up @@ -135,6 +137,7 @@ def get_environment_config(name="ADSTASH"):
"startds": env.get(f"{name}_READ_STARTDS"),
"read_schedd_history": env.get(f"{name}_SCHEDD_HISTORY"),
"read_startd_history": env.get(f"{name}_STARTD_HISTORY"),
"read_schedd_job_epoch_history": env.get(f"{name}_SCHEDD_JOB_EPOCH_HISTORY"),
"read_ad_file": env.get(f"{name}_AD_FILE"),
"schedd_history_max_ads": env.get(f"{name}_SCHEDD_HISTORY_MAX_ADS"),
"startd_history_max_ads": env.get(f"{name}_STARTD_HISTORY_MAX_ADS"),
Expand Down Expand Up @@ -221,6 +224,7 @@ def normalize_config_types(conf):
"standalone",
"read_schedd_history",
"read_startd_history",
"read_schedd_job_epoch_history",
"to_elasticsearch",
"to_json",
"se_use_https",
Expand Down Expand Up @@ -381,6 +385,15 @@ def get_config(argv=None):
"[default: %(default)s]"
),
)
source_group.add_argument(
"--schedd_job_epoch_history",
action="store_true",
dest="read_schedd_job_epoch_history",
help=(
"Poll Schedd Job Epoch histories "
"[default: %(default)s]"
),
)
source_group.add_argument(
"--ad_file",
type=Path,
Expand Down
8 changes: 5 additions & 3 deletions src/condor_scripts/adstash/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,16 @@ def record_time(ad, fallback_to_launch=True):
For Completed/Removed/Error jobs, try to update it:
- to CompletionDate if present
- else to EnteredCurrentStatus if present
- else fall back to launch time
For other (Running/Idle/Held/Suspended) jobs,
use EnteredCurrentStatus if present
Else fall back to launch time
"""
if ad["JobStatus"] in [3, 4, 6]:
if ad.get("CompletionDate", 0) > 0:
return ad["CompletionDate"]

elif ad.get("EnteredCurrentStatus", 0) > 0:
return ad["EnteredCurrentStatus"]
elif ad.get("EnteredCurrentStatus", 0) > 0:
return ad["EnteredCurrentStatus"]

if fallback_to_launch:
return _LAUNCH_TIME
Expand Down
4 changes: 4 additions & 0 deletions src/condor_utils/param_info.in
Original file line number Diff line number Diff line change
Expand Up @@ -5984,6 +5984,10 @@ default=False
description=Should condor_adstash query Startd job history
type=bool
default=False
[ADSTASH_SCHEDD_JOB_EPOCH_HISTORY]
description=Should condor_adstash query Schedd job epoch history
type=bool
default=False
[ADSTASH_SCHEDD_HISTORY_MAX_ADS]
description=Stop retrieving ads from a Schedd after this many ads per poll
type=int
Expand Down

0 comments on commit 4ac47f1

Please sign in to comment.